diff --git a/CMakeLists.txt b/CMakeLists.txt index d6f03811f..cb3a1fb3d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,8 +128,8 @@ endif() if( BUILD_WEB_SERVER ) include(./cmake/Web.cmake) - file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/common/proto/common/" - DESTINATION "${CMAKE_CURRENT_SOURCE_DIR}/web/proto/") + file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/common/proto3/common/" + DESTINATION "${CMAKE_CURRENT_SOURCE_DIR}/web/proto3/") if( ENABLE_UNIT_TESTS ) add_test(NAME unit_tests_web COMMAND "${DEPENDENCY_INSTALL_PATH}/nvm/versions/node/${LOCAL_NODE_VERSION}/bin/npm" "test") @@ -199,7 +199,7 @@ endif() if( BUILD_PYTHON_CLIENT ) # make target = pydatafed - file(COPY ${PROJECT_SOURCE_DIR}/external/DataFedDependencies/python/datafed_pkg/requirements.txt DESTINATION ${PROJECT_SOURCE_DIR}/python/datafed_pkg/requirements.txt) + file(COPY ${PROJECT_SOURCE_DIR}/external/DataFedDependencies/python/datafed_pkg/requirements.txt DESTINATION ${PROJECT_SOURCE_DIR}/python/datafed_pkg) add_subdirectory( python EXCLUDE_FROM_ALL ) endif() @@ -227,9 +227,9 @@ if( INSTALL_CORE_SERVER ) endif() if( INSTALL_WEB_SERVER ) - install( FILES ${ProtoFiles} DESTINATION ${DATAFED_INSTALL_PATH}/web ) install( DIRECTORY ${PROJECT_SOURCE_DIR}/web/static DESTINATION ${DATAFED_INSTALL_PATH}/web ) install( DIRECTORY ${PROJECT_SOURCE_DIR}/web/views DESTINATION ${DATAFED_INSTALL_PATH}/web ) + install( DIRECTORY ${PROJECT_SOURCE_DIR}/web/proto3 DESTINATION ${DATAFED_INSTALL_PATH}/web ) install( FILES ${PROJECT_SOURCE_DIR}/web/version.js DESTINATION ${DATAFED_INSTALL_PATH}/web ) endif() diff --git a/common/include/common/IMessage.hpp b/common/include/common/IMessage.hpp index 679d054ec..71aa5bbc7 100644 --- a/common/include/common/IMessage.hpp +++ b/common/include/common/IMessage.hpp @@ -17,16 +17,45 @@ class Message; namespace SDMS { -enum class MessageType { GOOGLE_PROTOCOL_BUFFER, STRING }; +/** + * @enum MessageType + * @brief Discriminator for the payload type carried by an IMessage. + */ +enum class MessageType { + GOOGLE_PROTOCOL_BUFFER, ///< Payload is a google::protobuf::Message. + STRING ///< Payload is a plain std::string. +}; /** - * The message is on its way to a server this it is a REQUEST - * The message is on its way from a server then it is a RESPONSE - **/ + * @enum MessageState + * @brief Indicates the directional state of a message relative to a server. + * + * - @c REQUEST — the message is traveling *toward* a server. + * - @c RESPONSE — the message is traveling *from* a server. + */ enum class MessageState { REQUEST, RESPONSE }; -enum class MessageAttribute { ID, KEY, STATE, CORRELATION_ID }; +/** + * @enum MessageAttribute + * @brief Named keys for the core metadata attributes stored on a message. + */ +enum class MessageAttribute { + ID, ///< Unique message identifier. + KEY, ///< Routing or lookup key. + STATE, ///< Current MessageState (REQUEST or RESPONSE). + CORRELATION_ID ///< Identifier used to correlate requests with responses. +}; +/** + * @brief Convert a MessageAttribute to its human-readable string form. + * + * @param[in] attribute The attribute to stringify. + * @return A string representation, or @c "unsupported_toString_print" if + * the attribute does not have a defined conversion. + * + * @note Only ID and KEY are currently supported; all other values fall + * through to the unsupported case. + */ inline const std::string toString(const MessageAttribute attribute) { if (attribute == MessageAttribute::ID) { return std::string("ID"); @@ -37,65 +66,186 @@ inline const std::string toString(const MessageAttribute attribute) { } } +/// @brief Well-known string constants used throughout the messaging layer. namespace constants { namespace message { namespace google { -/// Supported dynamic arguments +/// @brief Key for the serialized frame size of a protobuf message. const std::string FRAME_SIZE = "frame_size"; +/// @brief Key for the integral message-type identifier (e.g. envelope field number). const std::string MSG_TYPE = "msg_type"; +/// @brief Key for the security/session context attached to a message. const std::string CONTEXT = "context"; } // namespace google } // namespace message } // namespace constants +/** + * @class IMessage + * @brief Abstract interface for all messages exchanged through the SDMS + * messaging infrastructure. + * + * IMessage defines a uniform contract for: + * - **Payload management** — carrying either a protobuf Message or a raw + * string, with ownership semantics enforced by the implementation. + * - **Routing** — an ordered list of route identifiers that determines + * how a message is forwarded through the system. + * - **Metadata** — core attributes (ID, KEY, STATE, CORRELATION_ID) plus + * arbitrary named attributes with small-integer values. + * + * Implementations are expected to own the payload and manage its lifetime. + * Callers retrieve payload data via non-owning raw pointers or copies. + */ class IMessage { public: - + /// @brief Virtual destructor. virtual ~IMessage() {}; - virtual bool exists(MessageAttribute) const = 0; - virtual bool exists(const std::string &) const = 0; /** + * @brief Check whether a core metadata attribute has been set. + * + * @param[in] attribute The attribute to query. + * @return @c true if the attribute is present on this message. + */ + virtual bool exists(MessageAttribute attribute) const = 0; + + /** + * @brief Check whether a named (dynamic) attribute has been set. + * + * @param[in] attribute_name The name of the dynamic attribute to query. + * @return @c true if the named attribute is present on this message. + */ + virtual bool exists(const std::string &attribute_name) const = 0; + + /*--------------------------------------------------------------------- * Setters - **/ + *-------------------------------------------------------------------*/ + + /** + * @brief Set the message payload. + * + * The implementation must take ownership of the supplied payload. For + * protobuf payloads the unique_ptr is moved in; for string payloads the + * string is copied or moved. After this call the IMessage instance is + * solely responsible for the lifetime of the payload data. + * + * @param[in] payload A variant holding either a protobuf message + * (via @c std::unique_ptr) or a plain string. + */ + virtual void setPayload( + std::variant, + std::string> + payload) = 0; /** - * Adding a payload should make a copy and store internally. It should - * Imply ownership of the payload and it's memory management. - **/ - virtual void - setPayload(std::variant, - std::string>) = 0; + * @brief Append a single route to the end of the routing list. + * + * @param[in] route The route identifier to append. + */ virtual void addRoute(const std::string &route) = 0; + /** + * @brief Replace the entire routing list. + * + * @param[in] routes The new ordered list of route identifiers. + */ virtual void setRoutes(const std::list &routes) = 0; - virtual void set(MessageAttribute, const std::string &) = 0; - virtual void set(MessageAttribute, MessageState) = 0; + /** + * @brief Set a core metadata attribute to a string value. + * + * Applicable to attributes such as @c ID, @c KEY, and + * @c CORRELATION_ID. + * + * @param[in] attribute The attribute to set. + * @param[in] value The string value to assign. + */ + virtual void set(MessageAttribute attribute, const std::string &value) = 0; - virtual void set(std::string attribute_name, - std::variant) = 0; /** + * @brief Set a core metadata attribute to a MessageState value. + * + * Intended for the @c STATE attribute. + * + * @param[in] attribute The attribute to set (expected: @c STATE). + * @param[in] state The MessageState value to assign. + */ + virtual void set(MessageAttribute attribute, MessageState state) = 0; + + /** + * @brief Set a named dynamic attribute to a small unsigned integer. + * + * Dynamic attributes are identified by string keys (e.g. the constants + * in @c SDMS::constants::message::google). + * + * @param[in] attribute_name The name of the dynamic attribute. + * @param[in] value The value, stored as one of uint8/16/32. + */ + virtual void set(std::string attribute_name, + std::variant value) = 0; + + /*--------------------------------------------------------------------- * Getters - **/ + *-------------------------------------------------------------------*/ /** - * The correlation ID is assigned to a message when it is created and is - *extremely important for tracing a message in the logs. - **/ + * @brief Retrieve a core metadata attribute. + * + * The returned variant holds a @c std::string for most attributes, or a + * @c MessageState when querying @c STATE. + * + * @param[in] attribute The attribute to retrieve. + * @return A variant containing either the string value or the + * MessageState, depending on the attribute. + */ virtual std::variant - get(MessageAttribute) const = 0; + get(MessageAttribute attribute) const = 0; + + /** + * @brief Get a const reference to the ordered routing list. + * + * @return Const reference to the internal route list. + */ virtual const std::list &getRoutes() const = 0; + + /** + * @brief Get a mutable reference to the ordered routing list. + * + * @return Mutable reference to the internal route list. + */ virtual std::list &getRoutes() = 0; + + /** + * @brief Return the payload type carried by this message. + * + * @return The MessageType discriminator. + */ virtual MessageType type() const noexcept = 0; + + /** + * @brief Retrieve a named dynamic attribute. + * + * @param[in] attribute_name The name of the dynamic attribute. + * @return The value stored as a @c uint8_t, @c uint16_t, or @c uint32_t + * variant. + */ virtual std::variant get(const std::string &attribute_name) const = 0; - /// Note not returning a unique_ptr but a raw pointer because the message - // should stil have ownership of the object. + /** + * @brief Retrieve a non-owning handle to the message payload. + * + * Ownership remains with the IMessage instance. For protobuf payloads + * a raw pointer is returned (not a @c unique_ptr) to make this + * explicit. For string payloads the string is returned by value (copy). + * + * @return A variant holding either a non-owning + * @c google::protobuf::Message* or a @c std::string. + */ virtual std::variant getPayload() = 0; }; } // namespace SDMS + #endif // MESSAGE_HPP diff --git a/common/include/common/ProtoBufMap.hpp b/common/include/common/ProtoBufMap.hpp index b1da59161..e30f62b90 100644 --- a/common/include/common/ProtoBufMap.hpp +++ b/common/include/common/ProtoBufMap.hpp @@ -1,3 +1,14 @@ +/** + * @file ProtoBufMap.hpp + * @brief Provides message type mapping and envelope wrap/unwrap for the SDMS + * protobuf messaging layer. + * + * ProtoBufMap maintains bidirectional mappings between envelope field numbers + * (used as stable message type identifiers) and protobuf Descriptor objects. + * It also handles serialization boundary concerns via the Envelope message + * pattern, wrapping outgoing messages and unwrapping incoming ones. + */ + #ifndef PROTOBUFMAP_HPP #define PROTOBUFMAP_HPP #pragma once @@ -17,47 +28,154 @@ namespace SDMS { +/** + * @class ProtoBufMap + * @brief Bidirectional message type registry and envelope serialization layer. + * + * Maps between uint16_t message type identifiers (derived from envelope field + * numbers) and protobuf Descriptor/FileDescriptor objects. Implements the + * IMessageMapper interface and provides envelope wrap/unwrap operations for + * wire-boundary serialization. + * + * Message type identifiers are organized by category: + * - 10–29: Anonymous (no-auth) messages + * - 200–249: Record messages + * - etc. + * + * @see IMessageMapper + * @see SDMS::Envelope + */ class ProtoBufMap : public IMessageMapper { public: + /** @brief Maps a file index to its protobuf FileDescriptor. */ typedef std::map FileDescriptorMap; + + /** @brief Maps a message type ID (envelope field number) to its Descriptor. */ typedef std::map DescriptorMap; + + /** @brief Reverse mapping from Descriptor back to message type ID. */ typedef std::map MsgTypeMap; private: - FileDescriptorMap m_file_descriptor_map; - DescriptorMap m_descriptor_map; - MsgTypeMap m_msg_type_map; + FileDescriptorMap m_file_descriptor_map; ///< Registered file descriptors. + DescriptorMap m_descriptor_map; ///< Type ID → Descriptor lookup. + MsgTypeMap m_msg_type_map; ///< Descriptor → type ID lookup. public: + /** + * @brief Constructs the map and populates all mappings from the Envelope + * descriptor's oneof fields. + * + * Iterates the Envelope message descriptor via reflection to register every + * message field, using each field's number as the stable type identifier. + */ ProtoBufMap(); + /** + * @brief Retrieves the Descriptor for a given message type ID. + * + * @param message_type Envelope field number identifying the message type. + * @return Pointer to the Descriptor, or nullptr if not found. + */ const ::google::protobuf::Descriptor * getDescriptorType(uint16_t message_type) const; + + /** + * @brief Checks whether a message type ID is registered. + * + * @param message_type Envelope field number to look up. + * @return True if the type is registered, false otherwise. + */ bool exists(uint16_t message_type) const; - uint16_t getMessageType(const ::google::protobuf::Message& msg) const; + + /** + * @brief Resolves the message type ID for a concrete protobuf message. + * + * @param msg A protobuf message instance. + * @return The envelope field number corresponding to the message's type. + * @throws std::runtime_error If the message type is not registered. + */ + uint16_t getMessageType(const ::google::protobuf::Message &msg) const; + + /** + * @brief Returns a human-readable name for a message type ID. + * + * @param MessageType Envelope field number identifying the message type. + * @return The full protobuf type name, or an error string if not found. + */ std::string toString(uint16_t MessageType) const; - virtual uint16_t getMessageType(const std::string& message_name) const final; - - // Envelope wrap/unwrap for wire boundary + + /** + * @brief Resolves a message type ID from a protobuf message type name string. + * + * @param message_name Fully-qualified or short protobuf message name. + * @return The corresponding envelope field number. + * @throws std::runtime_error If the name does not match any registered type. + */ + virtual uint16_t + getMessageType(const std::string &message_name) const final; + + /** + * @brief Wraps an inner message into an Envelope for wire transmission. + * + * Resolves the inner message's type ID via getMessageType(), then uses + * reflection to set the corresponding oneof field in a new Envelope by + * copying from the inner message. + * + * @param inner The message to wrap. + * @return A populated Envelope ready for serialization. + * @throws EC_INVALID_PARAM If the resolved field number does not exist + * in the Envelope descriptor. + */ std::unique_ptr - wrapInEnvelope(const ::google::protobuf::Message& inner) const; + wrapInEnvelope(const ::google::protobuf::Message &inner) const; + /** + * @brief Extracts the inner message from a received Envelope. + * + * Uses reflection to determine which oneof field is set, then calls + * ReleaseMessage() to transfer ownership of the inner message out of + * the envelope. After this call, the released field in @p envelope is + * cleared. + * + * @param envelope The received Envelope to unwrap. Modified in place — + * the released field is no longer owned by the envelope. + * @return The extracted inner message. Caller takes ownership. + * @throws EC_INVALID_PARAM If the envelope's message type cannot be + * resolved to a valid field. + */ std::unique_ptr<::google::protobuf::Message> - unwrapFromEnvelope(SDMS::Envelope& envelope) const; + unwrapFromEnvelope(SDMS::Envelope &envelope) const; - // Check if message type requires authentication - virtual bool requiresAuth(const std::string& msg_type) const final { + /** + * @brief Determines whether a message type requires authentication. + * + * Messages in the anonymous set (e.g., version handshake, authentication + * requests) do not require a prior authenticated session. All other + * message types do. + * + * @param msg_type Short message type name (e.g., "VersionRequest"). + * @return True if the message requires an authenticated session, false if + * it is in the anonymous (no-auth) set. + */ + virtual bool requiresAuth(const std::string &msg_type) const final { static const std::unordered_set anon_types = { - "AckReply", "NackReply", "VersionRequest", "VersionReply", - "GetAuthStatusRequest", "AuthenticateByPasswordRequest", - "AuthenticateByTokenRequest", "AuthStatusReply", - "DailyMessageRequest", "DailyMessageReply" - }; + "AckReply", + "NackReply", + "VersionRequest", + "VersionReply", + "GetAuthStatusRequest", + "AuthenticateByPasswordRequest", + "AuthenticateByTokenRequest", + "AuthStatusReply", + "DailyMessageRequest", + "DailyMessageReply"}; return anon_types.count(msg_type) == 0; } }; + } // namespace SDMS #endif // PROTOBUFMAP_HPP diff --git a/common/source/Frame.hpp b/common/source/Frame.hpp index 5c68a2a25..aaa263ac4 100644 --- a/common/source/Frame.hpp +++ b/common/source/Frame.hpp @@ -1,3 +1,14 @@ +/** + * @file Frame.hpp + * @brief Wire-level frame header and conversion utilities for the SDMS + * messaging transport layer. + * + * Defines the fixed-size Frame header that prefixes every message on the wire, + * along with FrameConverter (for copying between Frame and ZMQ/IMessage + * representations) and FrameFactory (for constructing Frame headers from + * various message sources). + */ + #ifndef FRAME_HPP #define FRAME_HPP #pragma once @@ -16,36 +27,119 @@ namespace SDMS { // Forward declarations class ProtoBufMap; +/** + * @struct Frame + * @brief Fixed-size 8-byte header that precedes every message on the wire. + * + * Contains the serialized payload size, the message type identifier (envelope + * field number), and an optional application-defined context value. Fields are + * converted to/from network byte order at the ZMQ serialization boundary. + */ struct Frame { + uint32_t size = 0; ///< Size of the serialized payload in bytes. + uint16_t msg_type = 0; ///< Message type ID (envelope field number). + uint16_t context = 0; ///< Optional application-defined context value. - uint32_t size = 0; ///< Size of buffer in bytes - uint16_t msg_type = 0; - uint16_t context = 0; ///< Optional context value - + /** + * @brief Resets all fields to zero. + */ void clear() { size = 0; msg_type = 0; context = 0; } - }; +/** + * @class FrameConverter + * @brief Copies data between Frame headers and ZMQ or IMessage representations. + * + * Provides bidirectional conversion for ZMQ messages. The IMessage overload + * only supports the FROM_FRAME direction (Frame → IMessage); the reverse + * is unsupported and will throw. + */ class FrameConverter { public: - /** - * Make sure that zmq_msg_init is not called on this message before it - * is passed in. - **/ - enum class CopyDirection { TO_FRAME, FROM_FRAME }; + /** @brief Specifies the direction of a copy operation. */ + enum class CopyDirection { + TO_FRAME, ///< Deserialize: copy from source into Frame. + FROM_FRAME ///< Serialize: copy from Frame into destination. + }; + /** + * @brief Copies between a raw ZMQ message buffer and a Frame. + * + * The ZMQ message must be pre-initialized and sized to exactly 8 bytes + * (sizeof(Frame)) before calling. For the FROM_FRAME direction, use: + * @code + * zmq_msg_init_size(&zmq_msg, 8); + * @endcode + * + * @param copy Direction of the copy operation. + * @param zmq_msg Pre-initialized ZMQ message of exactly 8 bytes. + * @param frame Frame to populate (TO_FRAME) or read from (FROM_FRAME). + * + * @throws TraceException if zmq_msg size != sizeof(Frame). + */ void copy(CopyDirection copy, zmq_msg_t &zmq_msg, Frame &frame); + + /** + * @brief Copies frame fields from a Frame into an IMessage. + * + * @note Only CopyDirection::FROM_FRAME is supported. Passing TO_FRAME + * will throw. + * + * @param copy Must be CopyDirection::FROM_FRAME. + * @param msg IMessage to populate with frame metadata. + * @param frame Source Frame to read from. + * + * @throws TraceException if direction is TO_FRAME. + */ void copy(CopyDirection copy, IMessage &msg, const Frame &frame); }; +/** + * @class FrameFactory + * @brief Constructs Frame headers from various message representations. + * + * Each overload computes the payload size and resolves the message type ID + * appropriate to the source type. The context field is only populated when + * constructing from an IMessage that carries it. + */ class FrameFactory { public: + /** + * @brief Creates a Frame from a protobuf Message, resolving the type ID + * via ProtoBufMap. + * + * The context field is left at its default (0). + * + * @param a_msg Protobuf message (ByteSizeLong() determines frame size). + * @param proto_map Registry used to look up the message type ID. + * @return A Frame with size and msg_type populated. + */ Frame create(::google::protobuf::Message &a_msg, ProtoBufMap &proto_map); + + /** + * @brief Creates a Frame by extracting metadata from an IMessage. + * + * All three fields (FRAME_SIZE, MSG_TYPE, CONTEXT) must be present in the + * IMessage or the call will throw. + * + * @param msg The IMessage containing frame metadata. + * @return A fully populated Frame. + * @throws TraceException if any required field is missing from the IMessage. + */ Frame create(const IMessage &msg); + + /** + * @brief Creates a Frame by deserializing a raw ZMQ message. + * + * Returns a zeroed Frame if the ZMQ message is empty. + * + * @param zmq_msg Raw ZMQ message containing serialized frame bytes. + * @return The deserialized Frame, or a zeroed Frame if empty. + */ Frame create(zmq_msg_t &zmq_msg); }; diff --git a/common/source/ProtoBufMap.cpp b/common/source/ProtoBufMap.cpp index a24b7b47e..731009370 100644 --- a/common/source/ProtoBufMap.cpp +++ b/common/source/ProtoBufMap.cpp @@ -16,6 +16,12 @@ ProtoBufMap::ProtoBufMap() {} std::unique_ptr ProtoBufMap::wrapInEnvelope(const proto::Message& inner) const { + // If already an Envelope, return a copy — no double-wrapping + const auto* env = dynamic_cast(&inner); + if (env) { + return std::make_unique(*env); + } + uint16_t field_number = getMessageType(inner); auto envelope = std::make_unique(); const auto* field_desc = diff --git a/common/tests/unit/test_Proxy.cpp b/common/tests/unit/test_Proxy.cpp index 62721e2d4..b4ab4c4fe 100644 --- a/common/tests/unit/test_Proxy.cpp +++ b/common/tests/unit/test_Proxy.cpp @@ -228,7 +228,7 @@ BOOST_AUTO_TEST_CASE(testing_Proxy) { std::move(incoming_operators), log_context_proxy); std::chrono::duration duration = - std::chrono::milliseconds(2000); + std::chrono::milliseconds(1000); proxy.setRunDuration(duration); proxy.run(); @@ -448,7 +448,7 @@ BOOST_AUTO_TEST_CASE(testing_Proxy2) { log_context_proxy_middle); std::chrono::duration duration = - std::chrono::milliseconds(800); + std::chrono::milliseconds(400); proxy.setRunDuration(duration); proxy.run(); diff --git a/common/tests/unit/test_ProxyBasicZMQ.cpp b/common/tests/unit/test_ProxyBasicZMQ.cpp index 90f49a454..a5e319ddd 100644 --- a/common/tests/unit/test_ProxyBasicZMQ.cpp +++ b/common/tests/unit/test_ProxyBasicZMQ.cpp @@ -218,7 +218,7 @@ BOOST_AUTO_TEST_CASE(testing_ProxyBasicZMQ) { log_context1); std::chrono::duration duration = - std::chrono::milliseconds(2000); + std::chrono::milliseconds(1000); proxy.setRunDuration(duration); proxy.run(); @@ -261,7 +261,7 @@ BOOST_AUTO_TEST_CASE(testing_ProxyBasicZMQ) { ICommunicator::Response response = server->receive(MessageType::GOOGLE_PROTOCOL_BUFFER); - std::chrono::duration duration = std::chrono::milliseconds(800); + std::chrono::duration duration = std::chrono::milliseconds(400); auto end_time = std::chrono::steady_clock::now() + duration; while (response.time_out and end_time > std::chrono::steady_clock::now()) { diff --git a/core/server/DatabaseAPI.cpp b/core/server/DatabaseAPI.cpp index 7363d4c18..7a297b21e 100644 --- a/core/server/DatabaseAPI.cpp +++ b/core/server/DatabaseAPI.cpp @@ -7,7 +7,6 @@ #include "common/envelope.pb.h" #include "common/TraceException.hpp" #include "common/Util.hpp" -#include "common/envelope.pb.h" #include "common/enums/access_token_type.pb.h" #include "common/enums/search_mode.pb.h" diff --git a/docker/Dockerfile.runtime b/docker/Dockerfile.runtime index eb564dcc9..b8c8bfff1 100644 --- a/docker/Dockerfile.runtime +++ b/docker/Dockerfile.runtime @@ -27,6 +27,7 @@ COPY ./scripts/dependency_versions.sh ${BUILD_DIR}/scripts/ RUN mkdir -p ${DATAFED_DIR} RUN mkdir -p /opt/datafed RUN mkdir -p /var/log/datafed +RUN mkdir -p /opt/datafed/logs RUN chown -R datafed:root /opt/datafed RUN chown -R datafed:root /var/log/datafed RUN chown -R datafed:root ${DATAFED_DIR} diff --git a/python/datafed_pkg/CMakeLists.txt b/python/datafed_pkg/CMakeLists.txt index 730fbed68..ac834383e 100644 --- a/python/datafed_pkg/CMakeLists.txt +++ b/python/datafed_pkg/CMakeLists.txt @@ -35,4 +35,4 @@ endforeach() add_subdirectory( datafed ) add_custom_target( pydatafed ) -add_dependencies( pydatafed pydatafed_src) +add_dependencies( pydatafed pydatafed_proto_src) diff --git a/python/datafed_pkg/datafed/CLI.py b/python/datafed_pkg/datafed/CLI.py index 949d535ae..27918efba 100644 --- a/python/datafed_pkg/datafed/CLI.py +++ b/python/datafed_pkg/datafed/CLI.py @@ -34,8 +34,6 @@ from prompt_toolkit.history import FileHistory from prompt_toolkit.auto_suggest import AutoSuggestFromHistory -# from . import SDMS_Auth_pb2 as auth -from . import Version_pb2 from . import CommandLib from . import Config from . import VERSION @@ -162,14 +160,14 @@ def run(): except _NoCommand as e: # Be nice and switch to interactive when no command given if _interactive and _first: - api_version = f"{Version_pb2.DATAFED_COMMON_PROTOCOL_API_MAJOR}." - api_version += f"{Version_pb2.DATAFED_COMMON_PROTOCOL_API_MINOR}." - api_version += f"{Version_pb2.DATAFED_COMMON_PROTOCOL_API_PATCH}" - release_version = f"{Version_pb2.DATAFED_RELEASE_YEAR}." - release_version += f"{Version_pb2.DATAFED_RELEASE_MONTH}." - release_version += f"{Version_pb2.DATAFED_RELEASE_DAY}." - release_version += f"{Version_pb2.DATAFED_RELEASE_HOUR}." - release_version += f"{Version_pb2.DATAFED_RELEASE_MINUTE}" + api_version = f"{VERSION.DATAFED_COMMON_PROTOCOL_API_MAJOR}." + api_version += f"{VERSION.DATAFED_COMMON_PROTOCOL_API_MINOR}." + api_version += f"{VERSION.DATAFED_COMMON_PROTOCOL_API_PATCH}" + release_version = f"{VERSION.DATAFED_RELEASE_YEAR}." + release_version += f"{VERSION.DATAFED_RELEASE_MONTH}." + release_version += f"{VERSION.DATAFED_RELEASE_DAY}." + release_version += f"{VERSION.DATAFED_RELEASE_HOUR}." + release_version += f"{VERSION.DATAFED_RELEASE_MINUTE}" _print_msg(1, f"Welcome to DataFed CLI, version {VERSION.__version__}") _print_msg( 1, " Release, version {}".format(release_version) diff --git a/python/datafed_pkg/datafed/CMakeLists.txt b/python/datafed_pkg/datafed/CMakeLists.txt index 9e066deea..900b953cd 100644 --- a/python/datafed_pkg/datafed/CMakeLists.txt +++ b/python/datafed_pkg/datafed/CMakeLists.txt @@ -1,42 +1,60 @@ -cmake_minimum_required (VERSION 3.17.0) +cmake_minimum_required(VERSION 3.17.0) # Copy py source to build package source dir -file( GLOB SrcFiles ${CMAKE_CURRENT_SOURCE_DIR}/*.py ) +file(GLOB SrcFiles ${CMAKE_CURRENT_SOURCE_DIR}/*.py) foreach(file ${SrcFiles}) - configure_file(${file} ${CMAKE_CURRENT_BINARY_DIR} COPYONLY ) + configure_file(${file} ${CMAKE_CURRENT_BINARY_DIR} COPYONLY) endforeach() -# Collect top-level proto files as dependencies -file( GLOB ProtoFiles ${DataFed_SOURCE_DIR}/common/proto/common/*.proto ) +# Collect proto files from the new 1-1-1 directory structure +file(GLOB_RECURSE ProtoFiles ${DataFed_SOURCE_DIR}/common/proto3/common/*.proto) # OBJECT - is needed because we don't want to compile to a binary # because we are dealing with python add_library(protobuf-target-py OBJECT ${ProtoFiles}) + protobuf_generate( LANGUAGE python TARGET protobuf-target-py - IMPORT_DIRS "${DataFed_SOURCE_DIR}/common/proto/common" + IMPORT_DIRS "${DataFed_SOURCE_DIR}/common/proto3/common" OUT_VAR protobuf-generated-files-py PROTOC_OUT_DIR "${CMAKE_CURRENT_BINARY_DIR}" - ) - -add_custom_target( pydatafed_src DEPENDS protobuf-target-py ) - -# By default this will output the proto py files in the CMAKE BINARY DIR -add_custom_command( TARGET pydatafed_src POST_BUILD - COMMAND sed -i -r 's:^import.*_pb2:from . \\0:' ${protobuf-generated-files-py} - COMMAND ${DataFed_SOURCE_DIR}/python/pyproto_add_msg_idx.py ${DataFed_SOURCE_DIR}/common/proto/common/SDMS_Anon.proto ${CMAKE_CURRENT_BINARY_DIR}/SDMS_Anon_pb2.py - COMMAND ${DataFed_SOURCE_DIR}/python/pyproto_add_msg_idx.py ${DataFed_SOURCE_DIR}/common/proto/common/SDMS_Auth.proto ${CMAKE_CURRENT_BINARY_DIR}/SDMS_Auth_pb2.py -) - -# Crea#te copies of the files so they show up in the source folder as well -# for the purpose of testing -add_custom_target( pydatafed_proto_src DEPENDS pydatafed_src ) -add_custom_command( TARGET pydatafed_proto_src POST_BUILD pydatafed_src - COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/SDMS_Auth_pb2.py ${CMAKE_CURRENT_SOURCE_DIR}/ - COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/SDMS_pb2.py ${CMAKE_CURRENT_SOURCE_DIR}/ - COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/Version_pb2.py ${CMAKE_CURRENT_SOURCE_DIR}/ - COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/SDMS_Anon_pb2.py ${CMAKE_CURRENT_SOURCE_DIR}/ ) +add_custom_target(pydatafed_src DEPENDS protobuf-target-py) + +# Proto subdirectories that protoc generates imports for +set(PROTO_SUBDIRS anon auth enums messages) + +# Fix imports in generated pb2 files to use relative imports within the package. +# protoc generates absolute imports like: +# from anon import ack_reply_pb2 as ... +# from enums import error_code_pb2 as ... +# import envelope_pb2 as ... +# These must become relative imports: +# from .anon import ack_reply_pb2 as ... +# from .enums import error_code_pb2 as ... +# from . import envelope_pb2 as ... +# Create the import fixup script +add_custom_command(TARGET pydatafed_src POST_BUILD + COMMAND sh ${DataFed_SOURCE_DIR}/python/datafed_pkg/scripts/fix_proto_imports.sh ${CMAKE_CURRENT_BINARY_DIR} + COMMENT "Rewriting protobuf imports to relative" +) + +# Copy generated files back to source tree for testing +add_custom_target(pydatafed_proto_src DEPENDS pydatafed_src) +add_custom_command(TARGET pydatafed_proto_src POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy + ${CMAKE_CURRENT_BINARY_DIR}/envelope_pb2.py + ${CMAKE_CURRENT_SOURCE_DIR}/ + # Copy subdirectories back to source for testing + COMMAND ${CMAKE_COMMAND} -E copy_directory + ${CMAKE_CURRENT_BINARY_DIR}/anon ${CMAKE_CURRENT_SOURCE_DIR}/anon + COMMAND ${CMAKE_COMMAND} -E copy_directory + ${CMAKE_CURRENT_BINARY_DIR}/auth ${CMAKE_CURRENT_SOURCE_DIR}/auth + COMMAND ${CMAKE_COMMAND} -E copy_directory + ${CMAKE_CURRENT_BINARY_DIR}/enums ${CMAKE_CURRENT_SOURCE_DIR}/enums + COMMAND ${CMAKE_COMMAND} -E copy_directory + ${CMAKE_CURRENT_BINARY_DIR}/messages ${CMAKE_CURRENT_SOURCE_DIR}/messages +) diff --git a/python/datafed_pkg/datafed/CommandLib.py b/python/datafed_pkg/datafed/CommandLib.py index 3f4ad8098..7b9140678 100644 --- a/python/datafed_pkg/datafed/CommandLib.py +++ b/python/datafed_pkg/datafed/CommandLib.py @@ -14,11 +14,9 @@ import time import pathlib import requests -from . import SDMS_Auth_pb2 as auth -from . import SDMS_pb2 as sdms from . import MessageLib from . import Config - +from . import envelope_pb2 as sdms class API: """ @@ -168,7 +166,7 @@ def generateCredentials(self): ------ Exception: On communication or server error """ - msg = auth.GenerateCredentialsRequest() + msg = sdms.GenerateCredentialsRequest() return self._mapi.sendRecv(msg) @@ -236,7 +234,7 @@ def repoCreate( ------ Exception : On communication or server error """ - msg = auth.RepoCreateRequest() + msg = sdms.RepoCreateRequest() msg.id = repo_id msg.title = title msg.desc = desc @@ -260,7 +258,7 @@ def repoList(self, list_all: bool = False): By default will only list the repos associated with the user. """ - msg = auth.RepoListRequest() + msg = sdms.RepoListRequest() msg.all = list_all return self._mapi.sendRecv(msg) @@ -281,7 +279,7 @@ def repoDelete(self, repo_id): ------ Exception : On communication or server error """ - msg = auth.RepoDeleteRequest() + msg = sdms.RepoDeleteRequest() msg.id = repo_id return self._mapi.sendRecv(msg) @@ -289,7 +287,7 @@ def repoAllocationCreate(self, repo_id, subject, data_limit, rec_limit): if not repo_id.startswith("repo/"): repo_id = "repo/" + repo_id - msg = auth.RepoAllocationCreateRequest() + msg = sdms.RepoAllocationCreateRequest() msg.repo = repo_id msg.subject = subject msg.data_limit = data_limit @@ -300,14 +298,14 @@ def repoListAllocations(self, repo_id): if not repo_id.startswith("repo/"): repo_id = "repo/" + repo_id - msg = auth.RepoListAllocationsRequest() + msg = sdms.RepoListAllocationsRequest() msg.id = repo_id return self._mapi.sendRecv(msg) def repoAllocationDelete(self, repo_id, subject): if not repo_id.startswith("repo/"): repo_id = "repo/" + repo_id - msg = auth.RepoAllocationDeleteRequest() + msg = sdms.RepoAllocationDeleteRequest() msg.repo = repo_id msg.subject = subject return self._mapi.sendRecv(msg) @@ -341,7 +339,7 @@ def dataView(self, data_id, details=False, context=None): ------ Exception : On communication or server error """ - msg = auth.RecordViewRequest() + msg = sdms.RecordViewRequest() msg.id = self._resolve_id(data_id, context) msg.details = details @@ -436,7 +434,7 @@ def dataCreate( if metadata and metadata_file: raise Exception("Cannot specify both metadata and metadata-file options.") - msg = auth.RecordCreateRequest() + msg = sdms.RecordCreateRequest() msg.title = title msg.parent_id = self._resolve_id(parent_id, context) @@ -579,7 +577,7 @@ def dataUpdate( if metadata and metadata_file: raise Exception("Cannot specify both metadata and metadata-file options.") - msg = auth.RecordUpdateRequest() + msg = sdms.RecordUpdateRequest() msg.id = self._resolve_id(data_id, context) if title is not None: @@ -673,7 +671,7 @@ def dataDelete(self, data_id, context=None): ------ Exception : On invalid options or communication / server error """ - msg = auth.RecordDeleteRequest() + msg = sdms.RecordDeleteRequest() if isinstance(data_id, list): for i in data_id: @@ -740,7 +738,7 @@ def dataGet( # Request server to map specified IDs into a list of specific record IDs. # This accounts for download of collections. - msg = auth.DataGetRequest() + msg = sdms.DataGetRequest() msg.check = True if isinstance(item_id, str): @@ -761,7 +759,7 @@ def dataGet( if len(glob_ids) > 0: # Globus transfers - msg = auth.DataGetRequest() + msg = sdms.DataGetRequest() msg.id.extend(glob_ids) msg.path = self._resolvePathForGlobus(path, False) msg.encrypt = encrypt @@ -770,7 +768,7 @@ def dataGet( reply = self._mapi.sendRecv(msg) if reply[0].task and wait: - msg2 = auth.TaskViewRequest() + msg2 = sdms.TaskViewRequest() msg2.task_id = reply[0].task.id elapsed = 0 @@ -849,7 +847,7 @@ def dataPut( ------ Exception : On invalid options or communication / server error. """ - msg = auth.DataPutRequest() + msg = sdms.DataPutRequest() msg.id = self._resolve_id(data_id, context) msg.path = self._resolvePathForGlobus(path, False) msg.encrypt = encrypt @@ -859,7 +857,7 @@ def dataPut( reply = self._mapi.sendRecv(msg) if (reply[0].HasField("task")) and wait: - msg2 = auth.TaskViewRequest() + msg2 = sdms.TaskViewRequest() msg2.task_id = reply[0].task.id elapsed = 0 @@ -944,7 +942,7 @@ def dataBatchCreate(self, file, coll_id=None, context=None): payload.extend(records) - msg = auth.RecordCreateBatchRequest() + msg = sdms.RecordCreateBatchRequest() msg.records = jsonlib.dumps(payload) return self._mapi.sendRecv(msg) @@ -998,7 +996,7 @@ def dataBatchUpdate(self, file): else: payload.extend(records) - msg = auth.RecordUpdateBatchRequest() + msg = sdms.RecordUpdateBatchRequest() msg.records = jsonlib.dumps(payload) return self._mapi.sendRecv(msg) @@ -1029,7 +1027,7 @@ def collectionView(self, coll_id, context=None): ------ Exception : On invalid options or communication / server error. """ - msg = auth.CollViewRequest() + msg = sdms.CollViewRequest() msg.id = self._resolve_id(coll_id, context) # msg.id = self._resolve_coll_id( coll_id, context ) @@ -1083,7 +1081,7 @@ def collectionCreate( ------ Exception : On communication or server error """ - msg = auth.CollCreateRequest() + msg = sdms.CollCreateRequest() msg.title = title if alias: @@ -1149,7 +1147,7 @@ def collectionUpdate( ------ Exception : On communication or server error """ - msg = auth.CollUpdateRequest() + msg = sdms.CollUpdateRequest() msg.id = self._resolve_id(coll_id, context) if title is not None: @@ -1197,7 +1195,7 @@ def collectionDelete(self, coll_id, context=None): ------ Exception : On communication or server error """ - msg = auth.CollDeleteRequest() + msg = sdms.CollDeleteRequest() if isinstance(coll_id, list): for i in coll_id: @@ -1234,7 +1232,7 @@ def collectionItemsList(self, coll_id, offset=0, count=20, context=None): Exception : On communication or server error Exception : On invalid options """ - msg = auth.CollReadRequest() + msg = sdms.CollReadRequest() msg.count = count msg.offset = offset msg.id = self._resolve_id(coll_id, context) @@ -1276,7 +1274,7 @@ def collectionItemsUpdate(self, coll_id, add_ids=None, rem_ids=None, context=Non Exception : On communication or server error Exception : On invalid options """ - msg = auth.CollWriteRequest() + msg = sdms.CollWriteRequest() msg.id = self._resolve_id(coll_id, context) if isinstance(add_ids, list): @@ -1317,7 +1315,7 @@ def collectionGetParents(self, coll_id, inclusive=False, context=None): Exception : On communication or server error Exception : On invalid options """ - msg = auth.CollGetParentsRequest() + msg = sdms.CollGetParentsRequest() msg.id = self._resolve_id(coll_id, context) msg.inclusive = inclusive @@ -1348,7 +1346,7 @@ def queryList(self, offset=0, count=20): Exception : On communication or server error Exception : On invalid options """ - msg = auth.QueryListRequest() + msg = sdms.QueryListRequest() msg.offset = offset msg.count = count @@ -1371,7 +1369,7 @@ def queryView(self, query_id): ------ Exception : On communication or server error """ - msg = auth.QueryViewRequest() + msg = sdms.QueryViewRequest() msg.id = query_id return self._mapi.sendRecv(msg) @@ -1422,7 +1420,7 @@ def queryCreate( Exception : On communication or server error Exception : On invalid options """ - msg = auth.QueryCreateRequest() + msg = sdms.QueryCreateRequest() msg.title = title self._buildSearchRequest( @@ -1487,7 +1485,7 @@ def queryUpdate( Exception : On invalid options """ - msg = auth.QueryUpdateRequest() + msg = sdms.QueryUpdateRequest() msg.id = query_id if title is not None: @@ -1532,7 +1530,7 @@ def queryDelete(self, query_id): ------ Exception : On communication or server error """ - msg = auth.QueryDeleteRequest() + msg = sdms.QueryDeleteRequest() msg.id.append(query_id) return self._mapi.sendRecv(msg) @@ -1560,7 +1558,7 @@ def queryExec(self, query_id, offset=0, count=20): Exception : On communication or server error Exception : On invalid options """ - msg = auth.QueryExecRequest() + msg = sdms.QueryExecRequest() msg.id = query_id msg.offset = offset msg.count = count @@ -1612,7 +1610,7 @@ def queryDirect( Exception : On communication or server error Exception : On invalid options """ - msg = auth.SearchRequest() + msg = sdms.SearchRequest() self._buildSearchRequest( msg, @@ -1776,7 +1774,7 @@ def userListCollaborators(self, offset=0, count=20): Exception : On communication or server error Exception : On invalid options """ - msg = auth.UserListCollabRequest() + msg = sdms.UserListCollabRequest() msg.offset = offset msg.count = count @@ -1802,7 +1800,7 @@ def userListAll(self, offset=0, count=20): Exception : On communication or server error Exception : On invalid options """ - msg = auth.UserListAllRequest() + msg = sdms.UserListAllRequest() msg.offset = offset msg.count = count @@ -1826,7 +1824,7 @@ def userView(self, uid): Exception : On communication or server error Exception : On invalid options """ - msg = auth.UserViewRequest() + msg = sdms.UserViewRequest() msg.uid = uid return self._mapi.sendRecv(msg) @@ -1868,7 +1866,7 @@ def projectList(self, owned=True, admin=True, member=True, offset=0, count=20): Exception : On communication or server error Exception : On invalid options """ - msg = auth.ProjectListRequest() + msg = sdms.ProjectListRequest() msg.as_owner = owned msg.as_admin = admin msg.as_member = member @@ -1895,7 +1893,7 @@ def projectView(self, project_id): Exception : On communication or server error Exception : On invalid options """ - msg = auth.ProjectViewRequest() + msg = sdms.ProjectViewRequest() msg.id = project_id return self._mapi.sendRecv(msg) @@ -1918,7 +1916,7 @@ def projectGetRole(self, project_id): Exception : On communication or server error Exception : On invalid options """ - msg = auth.ProjectGetRoleRequest() + msg = sdms.ProjectGetRoleRequest() msg.id = project_id reply = self._mapi.sendRecv(msg) @@ -1951,7 +1949,7 @@ def sharedList(self, inc_users=None, inc_projects=None, subject=None): Exception : On communication or server error Exception : On invalid options """ - msg = auth.ACLSharedListRequest() + msg = sdms.ACLSharedListRequest() if inc_users is not None: msg.inc_users = inc_users @@ -1981,7 +1979,7 @@ def sharedUsersList( self ): Exception : On communication or server error Exception : On invalid options """ - msg = auth.ACLByUserRequest() + msg = sdms.ACLByUserRequest() return self._mapi.sendRecv( msg ) @@ -1999,7 +1997,7 @@ def sharedProjectsList( self ): Exception : On communication or server error Exception : On invalid options """ - msg = auth.ACLByProjRequest() + msg = sdms.ACLByProjRequest() return self._mapi.sendRecv( msg ) ''' @@ -2031,7 +2029,7 @@ def sharedListItems(self, owner_id, context=None, offset=None, count=None): """ # TODO add support for offset & count - msg = auth.ACLSharedListItemsRequest() + msg = sdms.ACLSharedListItemsRequest() msg.owner = owner_id.lower() if context is not None: msg.subject = context.lower() @@ -2081,7 +2079,7 @@ def taskList( if since is not None and (time_from is not None or time_to is not None): raise Exception("Cannot specify 'since' and 'from'/'to' ranges.") - msg = auth.TaskListRequest() + msg = sdms.TaskListRequest() if time_from is not None: ts = self.strToTimestamp(time_from) @@ -2191,12 +2189,12 @@ def taskView(self, task_id=None): Exception : On invalid options """ if task_id: - msg = auth.TaskViewRequest() + msg = sdms.TaskViewRequest() msg.task_id = task_id reply = self._mapi.sendRecv(msg) else: - msg = auth.TaskListRequest() + msg = sdms.TaskListRequest() msg.offset = 0 msg.count = 1 @@ -2221,7 +2219,7 @@ def endpointListRecent(self): ------ Exception : On communication or server error """ - msg = auth.UserGetRecentEPRequest() + msg = sdms.UserGetRecentEPRequest() return self._mapi.sendRecv(msg) @@ -2305,7 +2303,7 @@ def setupCredentials(self): "Client configuration directory and/or client key files not configured" ) - msg = auth.GenerateCredentialsRequest() + msg = sdms.GenerateCredentialsRequest() reply = self._mapi.sendRecv(msg) @@ -2352,7 +2350,7 @@ def setContext(self, item_id=None): id2 = item_id if id2[0:2] == "p/": - msg = auth.ProjectViewRequest() + msg = sdms.ProjectViewRequest() msg.id = id2 else: if id2[0:2] != "u/": @@ -2364,7 +2362,7 @@ def setContext(self, item_id=None): ) id2 = "u/" + id2 - msg = auth.UserViewRequest() + msg = sdms.UserViewRequest() msg.uid = id2 # Don't need reply - just using to throw an except if id/uid is diff --git a/python/datafed_pkg/datafed/Connection.py b/python/datafed_pkg/datafed/Connection.py index d34dd1e1c..87546f528 100644 --- a/python/datafed_pkg/datafed/Connection.py +++ b/python/datafed_pkg/datafed/Connection.py @@ -6,13 +6,11 @@ # unserialized, and custom framing is generated to efficiently convey message # type, size, and a re-association context value. # -# The Google protobuf library does not provide a mechanism for identifying -# message types numerically (only by string), so a build-time custom tool -# (pyproto_add_msg_idx.py) is used to generate the mappings from message -# names to message index (and vice versa) and appends this information as -# dictionaries to the compiled proto files (xxxx_pb2.py). The -# registerProtocol() method then loads uses this information to create -# consistent message type framing for python send/recv methods. +# Message type identification is derived at runtime from the Envelope proto +# message's field descriptors. Each message type has a stable field number +# in the Envelope, which serves as its wire-format type ID. This replaces +# the previous build-time pyproto_add_msg_idx.py hack that assigned type +# IDs based on message declaration order within proto files. from google.protobuf.message_factory import GetMessageClass import logging @@ -69,6 +67,10 @@ def __init__( self._msg_desc_by_type = {} self._msg_desc_by_name = {} self._msg_type_by_desc = {} + self._field_by_msg_desc = {} + + self._envelope_class = None + self._envelope_desc = None self._address = "tcp://{0}:{1}".format(server_host, server_port) # init zeromq @@ -116,19 +118,65 @@ def __del__(self): self._zmq_ctxt.destroy() ## - # @brief Register a protobuf module + # @brief Register message types from the Envelope proto message + # + # This method derives message type mappings at runtime by inspecting the + # Envelope message's field descriptors. Each field in the Envelope that + # wraps a message type has a stable field number, which becomes the + # message type ID used in wire framing. This replaces the old + # registerProtocol() approach that relied on build-time generated + # _msg_name_to_type / _msg_type_to_name dicts. + # + # @param envelope_module - The compiled envelope_pb2 module + # @param envelope_class_name - Name of the envelope message (default: "Envelope") + # + def registerEnvelope(self, envelope_module, envelope_class_name="Envelope"): + envelope_class = getattr(envelope_module, envelope_class_name) + envelope_desc = envelope_class.DESCRIPTOR + + # Store for envelope wrapping/unwrapping + self._envelope_class = envelope_class + self._envelope_desc = envelope_desc + + for field in envelope_desc.fields: + if field.message_type is None: + # Skip non-message fields (e.g. scalars) if any exist + continue + + msg_type = field.number + desc = field.message_type + + self._msg_desc_by_type[msg_type] = desc + self._msg_desc_by_name[desc.name] = desc + self._msg_type_by_desc[desc] = msg_type + self._field_by_msg_desc[desc] = field + + self._logger.debug( + "Registered %d message types from %s", + len(self._msg_desc_by_type), + envelope_class_name, + ) + + ## + # @brief Register a protobuf module (DEPRECATED - use registerEnvelope) # # This method registers an imported protobuf module (_pb2 file) for use # with the Connection class. Registration is required for proper message # framing and serialization. # + # This relies on build-time generated _msg_name_to_type dicts appended + # to _pb2 files by pyproto_add_msg_idx.py. Prefer registerEnvelope() + # which derives mappings from envelope field numbers at runtime. + # # @param msg_module - Protobuf module (imported *_pb2 module) # def registerProtocol(self, msg_module): - # Message descriptors are stored by name created by protobuf compiler - # A custom post-proc tool generates and appends _msg_name_to_type with - # defined DataFed-sepcific numer message types - + import warnings + warnings.warn( + "registerProtocol() is deprecated, use registerEnvelope() instead", + DeprecationWarning, + stacklevel=2, + ) for name, desc in sorted(msg_module.DESCRIPTOR.message_types_by_name.items()): msg_t = msg_module._msg_name_to_type[name] self._msg_desc_by_type[msg_t] = desc @@ -138,15 +186,15 @@ def registerProtocol(self, msg_module): ## # @brief Receive a message # - # Receive a protobuf message with timeout. This method automatically - # parses and creates a new protobuf message class based on received - # framing. The new message object, the message name (defined in the - # associated proto file), and re-association context are returned as - # a tuple. On timeout, (None,None,None) is returned. + # Receive a protobuf message with timeout. The wire payload is an + # Envelope message; this method deserializes the Envelope and extracts + # the inner message via the oneof payload field. The inner message + # object, its name, and re-association context are returned as a tuple. + # On timeout, (None, None, None) is returned. # # @param timeout - Timeout in milliseconds - # @return Tuple of message, message type, and re-association context - # @retval (object,str,int) or (None,None,None) on timeout + # @return Tuple of message, message name, and re-association context + # @retval (object, str, int) or (None, None, None) on timeout # @exception Exception: if unregistered message type is received. # def recv(self, a_timeout=1000): @@ -180,38 +228,46 @@ def recv(self, a_timeout=1000): # client self._socket.recv_string(0) - # receive custom frame header and unpack + # Receive frame: 8 bytes = uint32 size + uint16 msg_type + uint16 context frame_data = self._socket.recv(0) - frame_values = struct.unpack(">LBBH", frame_data) - msg_type = (frame_values[1] << 8) | frame_values[2] - - # find message descriptor based on type (descriptor index) + frame_values = struct.unpack(">LHH", frame_data) + body_size = frame_values[0] + msg_type = frame_values[1] + ctxt = frame_values[2] - if not (msg_type in self._msg_desc_by_type): + if msg_type not in self._msg_desc_by_type: raise Exception( "received unregistered message type: {}".format(msg_type) ) - desc = self._msg_desc_by_type[msg_type] + data = self._socket.recv(0) - if frame_values[0] > 0: - # Create message by parsing content - data = self._socket.recv(0) - reply = GetMessageClass(desc)() - reply.ParseFromString(data) + if body_size > 0: + # Deserialize as Envelope + envelope = self._envelope_class() + envelope.ParseFromString(data) + + # Extract inner message from the oneof + payload_field = envelope.WhichOneof("payload") + if payload_field is None: + raise Exception("Received Envelope with no payload set") + reply = getattr(envelope, payload_field) else: - # No content, just create message instance - data = self._socket.recv(0) + # Zero-size body: create empty message instance from type + desc = self._msg_desc_by_type[msg_type] reply = GetMessageClass(desc)() - return reply, desc.name, frame_values[3] + return reply, reply.DESCRIPTOR.name, ctxt else: return None, None, None ## # @brief Send a message # - # Serializes and sends framing and message payload over connection. + # Wraps the inner message in an Envelope, serializes it, and sends + # framing and payload over the connection. The frame header carries the + # message type (Envelope field number) for efficient routing on the + # server side. # # @param message - The protobuf message object to be sent # @param ctxt - Reply re-association value (int) @@ -219,9 +275,15 @@ def recv(self, a_timeout=1000): # def send(self, message, ctxt): # Find msg type by descriptor look-up - if not (message.DESCRIPTOR in self._msg_type_by_desc): + if message.DESCRIPTOR not in self._msg_type_by_desc: raise Exception("Attempt to send unregistered message type.") + msg_type = self._msg_type_by_desc[message.DESCRIPTOR] + field = self._field_by_msg_desc[message.DESCRIPTOR] + + # Wrap inner message in Envelope + envelope = self._envelope_class() + getattr(envelope, field.name).CopyFrom(message) # Initial Null frame self._socket.send_string("BEGIN_DATAFED", zmq.SNDMORE) @@ -235,12 +297,12 @@ def send(self, message, ctxt): self._socket.send_string(self._pub_key, zmq.SNDMORE) self._socket.send_string("no_user", zmq.SNDMORE) - # Serialize - data = message.SerializeToString() + # Serialize the Envelope (not the inner message) + data = envelope.SerializeToString() data_sz = len(data) - # Build the message frame, to match C-struct MessageFrame - frame = struct.pack(">LBBH", data_sz, msg_type >> 8, msg_type & 0xFF, ctxt) + # Build the message frame: uint32 size + uint16 msg_type + uint16 context + frame = struct.pack(">LHH", data_sz, msg_type, ctxt) if data_sz > 0: # Send frame and payload diff --git a/python/datafed_pkg/datafed/MessageLib.py b/python/datafed_pkg/datafed/MessageLib.py index 62354c0eb..f0964e7d7 100644 --- a/python/datafed_pkg/datafed/MessageLib.py +++ b/python/datafed_pkg/datafed/MessageLib.py @@ -12,9 +12,7 @@ import zmq -from . import Version_pb2 -from . import SDMS_Anon_pb2 as anon -from . import SDMS_Auth_pb2 as auth +from . import envelope_pb2 as proto from . import Connection from . import VERSION @@ -166,8 +164,7 @@ def __init__( server_host, server_port, _server_pub_key, _client_pub_key, _client_priv_key ) - self._conn.registerProtocol(anon) - self._conn.registerProtocol(auth) + self._conn.registerEnvelope(proto) # Make a request to pypi package_name = "datafed" # Replace with the package name you want to check @@ -191,14 +188,14 @@ def __init__( self.new_client_avail = latest_version_on_pypi # Check for compatible protocol versions - reply, mt = self.sendRecv(anon.VersionRequest(), 10000) + reply, mt = self.sendRecv(proto.VersionRequest(), 10000) if reply is None: raise Exception( "Timeout waiting for server connection. Make sure" "the right ports are open." ) - if reply.api_major != Version_pb2.DATAFED_COMMON_PROTOCOL_API_MAJOR: + if reply.api_major != VERSION.DATAFED_COMMON_PROTOCOL_API_MAJOR: error_msg = ( "Incompatible server api detected {}.{}.{}, you are running " "{}.{}.{} consider " @@ -206,9 +203,9 @@ def __init__( reply.api_major, reply.api_minor, reply.api_patch, - Version_pb2.DATAFED_COMMON_PROTOCOL_API_MAJOR, - Version_pb2.DATAFED_COMMON_PROTOCOL_API_MINOR, - Version_pb2.DATAFED_COMMON_PROTOCOL_API_PATCH, + VERSION.DATAFED_COMMON_PROTOCOL_API_MAJOR, + VERSION.DATAFED_COMMON_PROTOCOL_API_MINOR, + VERSION.DATAFED_COMMON_PROTOCOL_API_PATCH, ) ) if self.new_client_avail: @@ -223,7 +220,7 @@ def __init__( self.manualAuthByToken(client_token) else: # Check if server authenticated based on keys - reply, mt = self.sendRecv(anon.GetAuthStatusRequest(), 10000) + reply, mt = self.sendRecv(proto.GetAuthStatusRequest(), 10000) self._auth = reply.auth self._uid = reply.uid @@ -263,7 +260,7 @@ def getAuthStatus(self): # @exception Exception: On communication timeout or authentication failure. # def manualAuthByPassword(self, uid, password): - msg = anon.AuthenticateByPasswordRequest() + msg = proto.AuthenticateByPasswordRequest() msg.uid = uid msg.password = password a, b = self.sendRecv(msg) @@ -272,7 +269,7 @@ def manualAuthByPassword(self, uid, password): self._conn.reset() # Test auth status - reply, mt = self.sendRecv(anon.GetAuthStatusRequest()) + reply, mt = self.sendRecv(proto.GetAuthStatusRequest()) if not reply.auth: raise Exception("Password authentication failed.") @@ -280,7 +277,7 @@ def manualAuthByPassword(self, uid, password): self._uid = reply.uid def manualAuthByToken(self, token): - msg = anon.AuthenticateByTokenRequest() + msg = proto.AuthenticateByTokenRequest() msg.token = token self.sendRecv(msg) @@ -288,7 +285,7 @@ def manualAuthByToken(self, token): self._conn.reset() # Test auth status - reply, mt = self.sendRecv(anon.GetAuthStatusRequest()) + reply, mt = self.sendRecv(proto.GetAuthStatusRequest()) if not reply.auth: raise Exception("Token authentication failed") @@ -332,7 +329,7 @@ def getDefaultTimeout(self): def getDailyMessage(self): # Get daily message, if set - reply, mt = self.sendRecv(anon.DailyMessageRequest(), 10000) + reply, mt = self.sendRecv(proto.DailyMessageRequest(), 10000) if reply is None: raise Exception("Timeout waiting for server connection.") diff --git a/python/datafed_pkg/datafed/VERSION.py.in b/python/datafed_pkg/datafed/VERSION.py.in index fc9c6a3b3..03ae7c5f5 100644 --- a/python/datafed_pkg/datafed/VERSION.py.in +++ b/python/datafed_pkg/datafed/VERSION.py.in @@ -1 +1,9 @@ __version__="@DATAFED_PYTHON_CLIENT_MAJOR@.@DATAFED_PYTHON_CLIENT_MINOR@.@DATAFED_PYTHON_CLIENT_PATCH@@DATAFED_PYTHON_CLIENT_RELEASE_TYPE@@DATAFED_PYTHON_CLIENT_PRE_RELEASE_IDENTIFER@" +DATAFED_COMMON_PROTOCOL_API_MAJOR=@DATAFED_COMMON_PROTOCOL_API_MAJOR@ +DATAFED_COMMON_PROTOCOL_API_MINOR=@DATAFED_COMMON_PROTOCOL_API_MINOR@ +DATAFED_COMMON_PROTOCOL_API_PATCH=@DATAFED_COMMON_PROTOCOL_API_PATCH@ +DATAFED_RELEASE_YEAR=@DATAFED_RELEASE_YEAR@ +DATAFED_RELEASE_MONTH=@DATAFED_RELEASE_MONTH@ +DATAFED_RELEASE_DAY=@DATAFED_RELEASE_DAY@ +DATAFED_RELEASE_HOUR=@DATAFED_RELEASE_HOUR@ +DATAFED_RELEASE_MINUTE=@DATAFED_RELEASE_MINUTE@ diff --git a/python/datafed_pkg/scripts/fix_proto_imports.sh b/python/datafed_pkg/scripts/fix_proto_imports.sh new file mode 100755 index 000000000..06b10a1a0 --- /dev/null +++ b/python/datafed_pkg/scripts/fix_proto_imports.sh @@ -0,0 +1,65 @@ +#!/bin/sh +set -e + +PROTO_DIR="$1" +ROOT_DIR="${2:-$1}" + +if [ -z "$PROTO_DIR" ]; then + echo "Usage: fix_proto_imports.sh [root_dir]" + echo " proto_output_dir: directory to find and fix _pb2.py files" + echo " root_dir: package root for computing relative depth (defaults to proto_output_dir)" + exit 1 +fi + +find "$PROTO_DIR" -name '*_pb2.py' | while read f; do + relpath=$(realpath --relative-to="$ROOT_DIR" "$f") + case "$relpath" in + */*) + sed -i \ + -e 's:^from anon import:from ..anon import:g' \ + -e 's:^from anon\.:from ..anon.:g' \ + -e 's:^from auth import:from ..auth import:g' \ + -e 's:^from auth\.:from ..auth.:g' \ + -e 's:^from enums import:from ..enums import:g' \ + -e 's:^from enums\.:from ..enums.:g' \ + -e 's:^from messages import:from ..messages import:g' \ + -e 's:^from messages\.:from ..messages.:g' \ + -e 's:^import \(.*_pb2\):from . import \1:g' \ + "$f" + ;; + *) + sed -i \ + -e 's:^from anon import:from .anon import:g' \ + -e 's:^from anon\.:from .anon.:g' \ + -e 's:^from auth import:from .auth import:g' \ + -e 's:^from auth\.:from .auth.:g' \ + -e 's:^from enums import:from .enums import:g' \ + -e 's:^from enums\.:from .enums.:g' \ + -e 's:^from messages import:from .messages import:g' \ + -e 's:^from messages\.:from .messages.:g' \ + -e 's:^import \(.*_pb2\):from . import \1:g' \ + "$f" + ;; + esac +done + +for subdir in anon auth enums messages; do + if [ -d "$ROOT_DIR/$subdir" ]; then + touch "$ROOT_DIR/$subdir/__init__.py" + fi +done + +# Append re-exports to envelope_pb2.py for backward compatibility +# Connection.py uses getattr(envelope_module, class_name) for dynamic dispatch +echo "" >>"$ROOT_DIR/envelope_pb2.py" +echo "# Re-export all message and enum classes for dynamic lookup" >>"$ROOT_DIR/envelope_pb2.py" + +for subdir in anon auth enums messages; do + if [ -d "$ROOT_DIR/$subdir" ]; then + for f in "$ROOT_DIR/$subdir"/*_pb2.py; do + [ -f "$f" ] || continue + module=$(basename "$f" .py) + echo "from .$subdir.$module import *" >>"$ROOT_DIR/envelope_pb2.py" + done + fi +done diff --git a/python/datafed_pkg/setup.py b/python/datafed_pkg/setup.py index 21abefdcc..55ebd76d9 100644 --- a/python/datafed_pkg/setup.py +++ b/python/datafed_pkg/setup.py @@ -22,6 +22,13 @@ long_description_content_type="text/markdown", url="https://github.com/ORNL/DataFed", packages=setuptools.find_packages(), + package_data={ + "datafed": ["*.py"], + "datafed.anon": ["*.py"], + "datafed.auth": ["*.py"], + "datafed.enums": ["*.py"], + "datafed.messages": ["*.py"], + }, setup_requires=["setuptools"], install_requires=install_requires, entry_points={"console_scripts": ["datafed = datafed.CLI:run"]}, @@ -31,3 +38,5 @@ "Operating System :: OS Independent", ], ) + + diff --git a/python/datafed_pkg/test/security.py b/python/datafed_pkg/test/security.py index d9b479c1a..a8e42c027 100755 --- a/python/datafed_pkg/test/security.py +++ b/python/datafed_pkg/test/security.py @@ -2,7 +2,7 @@ import getpass import datafed.CommandLib -import datafed.SDMS_Auth_pb2 as auth +import datafed.envelope_pb2 as sdms opts = {} @@ -15,7 +15,7 @@ api.loginByPassword(uid, password) -msg = auth.UserCreateRequest() +msg = sdms.UserCreateRequest() msg.uid = "newuser" msg.password = "temptemp" msg.name = "New User" diff --git a/python/pyproto_add_msg_idx.py b/python/pyproto_add_msg_idx.py deleted file mode 100755 index 233f83b33..000000000 --- a/python/pyproto_add_msg_idx.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python3 - -""" -Protobuf processing to generate message ID maps for C++, Python, and JS -""" - -import sys -import re - -print("args", sys.argv) - -pf_in = open(sys.argv[1], "r") -pf_out = open(sys.argv[2], "a") - -while True: - line = pf_in.readline() - if len(line) == 0: - sys.exit(-1) - parts = re.split(r"\W+", line.strip()) - # print( line, parts ) - try: - idx = parts.index("ID") - # print( "ID:", parts[idx+1] ) - msg_type = int(parts[idx + 1]) << 8 - break - except BaseException: - pass - -# msg_type = 0 - -by_type = [] -idx = 0 - -pf_out.write("\n_msg_name_to_type = {\n") - -while True: - line = pf_in.readline() - if len(line) == 0: - break - - if line.startswith("message "): - msg_name = line.split()[1] - by_type.append(msg_name) - # print( msg_name, msg_type ) - if idx > 0: - pf_out.write(",\n") - pf_out.write(" '{}' : {}".format(msg_name, msg_type | idx)) - idx += 1 - -pf_out.write("\n}\n\n_msg_type_to_name = {\n") - -idx = 0 -for name in by_type: - if idx > 0: - pf_out.write(",\n") - pf_out.write(" {} : '{}'".format(msg_type | idx, name)) - idx += 1 - -pf_out.write("\n}\n") - -sys.exit(0) diff --git a/repository/gridftp/globus5/authz/source/AuthzWorker.cpp b/repository/gridftp/globus5/authz/source/AuthzWorker.cpp index 234c1e166..9050bb45b 100644 --- a/repository/gridftp/globus5/authz/source/AuthzWorker.cpp +++ b/repository/gridftp/globus5/authz/source/AuthzWorker.cpp @@ -14,11 +14,8 @@ #include "common/TraceException.hpp" #include "common/Util.hpp" -// Protobuf includes -#include "common/SDMS.pb.h" -#include "common/SDMS_Anon.pb.h" -#include "common/SDMS_Auth.pb.h" -#include "common/Version.pb.h" +// Proto files +#include "common/envelope.pb.h" // Standard includes #include @@ -28,9 +25,8 @@ #include #include +using namespace SDMS; using namespace std; -using namespace SDMS::Anon; -using namespace SDMS::Auth; namespace { @@ -507,7 +503,7 @@ int AuthzWorker::processResponse(ICommunicator::Response &response) { auto payload = std::get(response.message->getPayload()); - Anon::NackReply *nack = dynamic_cast(payload); + NackReply *nack = dynamic_cast(payload); if (!nack) { return 0; } else { @@ -581,7 +577,7 @@ int AuthzWorker::checkAuth(char *client_id, char *path, char *action) { return 0; } - auto auth_req = std::make_unique(); + auto auth_req = std::make_unique(); auth_req->set_repo(m_config->repo_id); auth_req->set_client(client_id); @@ -619,19 +615,19 @@ const char *getVersion() { const char *getAPIVersion() { static std::string ver_str = - std::to_string(DATAFED_COMMON_PROTOCOL_API_MAJOR) + "." + - std::to_string(DATAFED_COMMON_PROTOCOL_API_MINOR) + "." + - std::to_string(DATAFED_COMMON_PROTOCOL_API_PATCH); + std::to_string(protocol::version::MAJOR) + "." + + std::to_string(protocol::version::MINOR) + "." + + std::to_string(protocol::version::PATCH); return ver_str.c_str(); } const char *getReleaseVersion() { - static std::string ver_str = std::to_string(DATAFED_RELEASE_YEAR) + "." + - std::to_string(DATAFED_RELEASE_MONTH) + "." + - std::to_string(DATAFED_RELEASE_DAY) + "." + - std::to_string(DATAFED_RELEASE_HOUR) + "." + - std::to_string(DATAFED_RELEASE_MINUTE); + static std::string ver_str = std::to_string(release::YEAR) + "." + + std::to_string(release::MONTH) + "." + + std::to_string(release::DAY) + "." + + std::to_string(release::HOUR) + "." + + std::to_string(release::MINUTE); return ver_str.c_str(); } diff --git a/repository/gridftp/globus5/authz/source/Version.hpp.in b/repository/gridftp/globus5/authz/source/Version.hpp.in index cdd5e35e0..f698e5b7a 100644 --- a/repository/gridftp/globus5/authz/source/Version.hpp.in +++ b/repository/gridftp/globus5/authz/source/Version.hpp.in @@ -10,6 +10,23 @@ namespace SDMS { constexpr int PATCH = @DATAFED_AUTHZ_PATCH@; } } + + namespace protocol { + namespace version { + constexpr int MAJOR = @DATAFED_COMMON_PROTOCOL_API_MAJOR@; + constexpr int MINOR = @DATAFED_COMMON_PROTOCOL_API_MINOR@; + constexpr int PATCH = @DATAFED_COMMON_PROTOCOL_API_PATCH@; + } + } + + namespace release { + constexpr int YEAR = @DATAFED_RELEASE_YEAR@; + constexpr int MONTH = @DATAFED_RELEASE_MONTH@; + constexpr int DAY = @DATAFED_RELEASE_DAY@; + constexpr int HOUR = @DATAFED_RELEASE_HOUR@; + constexpr int MINUTE = @DATAFED_RELEASE_MINUTE@; + } + } #endif // AUTHZ_VERSION_HPP diff --git a/repository/gridftp/globus5/authz/tests/unit/test_AuthzWorker.cpp b/repository/gridftp/globus5/authz/tests/unit/test_AuthzWorker.cpp index ba51d8d08..5b03f8466 100644 --- a/repository/gridftp/globus5/authz/tests/unit/test_AuthzWorker.cpp +++ b/repository/gridftp/globus5/authz/tests/unit/test_AuthzWorker.cpp @@ -15,7 +15,7 @@ #include "common/ICommunicator.hpp" #include "common/IMessage.hpp" #include "common/MessageFactory.hpp" -#include "common/SDMS_Anon.pb.h" +#include "common/envelope.pb.h" #include "common/TraceException.hpp" extern "C" { @@ -397,7 +397,7 @@ BOOST_AUTO_TEST_CASE(ProcessResponseWithValidMessage) { SDMS::MessageState::REQUEST); response.message->set(SDMS::constants::message::google::CONTEXT, context); auto auth_by_token_req = - std::make_unique(); + std::make_unique(); std::string token = "golden_chest"; auth_by_token_req->set_token(token); @@ -429,7 +429,7 @@ BOOST_AUTO_TEST_CASE(ProcessResponseWithNackReply) { response.message->set(SDMS::MessageAttribute::STATE, SDMS::MessageState::REQUEST); response.message->set(SDMS::constants::message::google::CONTEXT, context); - auto nack = std::make_unique(); + auto nack = std::make_unique(); response.message->setPayload(std::move(nack)); diff --git a/web/datafed-ws.js b/web/datafed-ws.js index d4373c707..b52b23c31 100755 --- a/web/datafed-ws.js +++ b/web/datafed-ws.js @@ -15,7 +15,7 @@ if (process.argv.length != 3) { throw "Invalid arguments, usage: datafed-ws config-file"; } -import web_version from "./version.js"; +import version from "./version.js"; import express from "express"; // For REST api import session from "express-session"; import sanitizeHtml from "sanitize-html"; @@ -55,6 +55,7 @@ var g_host, g_test, g_msg_by_id = {}, g_msg_by_name = {}, + g_envelope_type, g_core_sock = zmq.socket("dealer"), g_core_serv_addr, g_globus_auth, @@ -64,7 +65,7 @@ var g_host, g_ctx_next = 0, g_client_id, g_client_secret, - g_ready_start = 4, + g_ready_start = 3, g_version, g_ver_release_year, g_ver_release_month, @@ -145,6 +146,25 @@ class Logger { const logger = new Logger(LogLevel.INFO); +g_ver_release_year = version.DATAFED_RELEASE_YEAR; +g_ver_release_month = version.DATAFED_RELEASE_MONTH; +g_ver_release_day = version.DATAFED_RELEASE_DAY; +g_ver_release_hour = version.DATAFED_RELEASE_HOUR; +g_ver_release_minute = version.DATAFED_RELEASE_MINUTE; + +g_version = + g_ver_release_year + + "." + + g_ver_release_month + + "." + + g_ver_release_day + + "." + + g_ver_release_hour + + "." + + g_ver_release_minute; + +if (--g_ready_start == 0) startServer(); + function getCurrentLineNumber() { const stackTrace = new Error().stack; const lineMatches = stackTrace.match(/:\d+:\d+/g); @@ -1108,23 +1128,6 @@ app.get("/api/dat/lock", (a_req, a_resp) => { ); }); -app.get("/api/dat/lock/toggle", (a_req, a_resp) => { - sendMessage("RecordLockToggleRequest", { id: a_req.query.id }, a_req, a_resp, function (reply) { - a_resp.send(reply); - }); -}); - -app.get("/api/dat/copy", (a_req, a_resp) => { - var params = { - sourceId: a_req.query.src, - destId: a_req.query.dst, - }; - - sendMessage("DataCopyRequest", params, a_req, a_resp, function (reply) { - a_resp.send(reply); - }); -}); - app.get("/api/dat/delete", (a_req, a_resp) => { sendMessage( "RecordDeleteRequest", @@ -1211,18 +1214,6 @@ app.get("/api/dat/put", (a_req, a_resp) => { }); }); -app.get("/api/dat/dep/get", (a_req, a_resp) => { - sendMessage( - "RecordGetDependenciesRequest", - { id: a_req.query.ids }, - a_req, - a_resp, - function (reply) { - a_resp.send(reply); - }, - ); -}); - app.get("/api/dat/dep/graph/get", (a_req, a_resp) => { sendMessage( "RecordGetDependencyGraphRequest", @@ -1569,12 +1560,6 @@ app.get("/api/col/published/list", (a_req, a_resp) => { }); }); -app.post("/api/cat/search", (a_req, a_resp) => { - sendMessage("CatalogSearchRequest", a_req.body, a_req, a_resp, function (reply) { - a_resp.send(reply); - }); -}); - app.get("/api/globus/consent_url", storeCollectionId, (a_req, a_resp) => { const { requested_scopes, state, refresh_tokens, query_params } = a_req.query; @@ -1590,12 +1575,6 @@ app.get("/api/globus/consent_url", storeCollectionId, (a_req, a_resp) => { a_resp.json({ consent_url }); }); -app.post("/api/col/pub/search/data", (a_req, a_resp) => { - sendMessage("RecordSearchPublishedRequest", a_req.body, a_req, a_resp, function (reply) { - a_resp.send(reply); - }); -}); - app.get("/api/repo/list", (a_req, a_resp) => { var params = {}; if (a_req.query.all) params.all = a_req.query.all; @@ -1772,18 +1751,6 @@ app.get("/api/top/list/topics", (a_req, a_resp) => { }); }); -app.get("/api/top/list/coll", (a_req, a_resp) => { - var par = { topicId: a_req.query.id }; - if (a_req.query.offset != undefined && a_req.query.count != undefined) { - par.offset = a_req.query.offset; - par.count = a_req.query.count; - } - - sendMessage("TopicListCollectionsRequest", par, a_req, a_resp, function (reply) { - a_resp.json(reply); - }); -}); - app.get("/api/top/view", (a_req, a_resp) => { sendMessage("TopicViewRequest", { id: a_req.query.id }, a_req, a_resp, function (reply) { a_resp.json(reply); @@ -2065,15 +2032,17 @@ function sendMessage(a_msg_name, a_msg_data, a_req, a_resp, a_cb, a_anon) { a_resp.setHeader("Content-Type", "application/json"); allocRequestContext(a_resp, function (ctx) { - var msg = g_msg_by_name[a_msg_name]; - if (!msg) throw "Invalid message type: " + a_msg_name; + var msg_info = g_msg_by_name[a_msg_name]; + if (!msg_info) throw "Invalid message type: " + a_msg_name; - var msg_buf = msg.encode(a_msg_data).finish(); + // Wrap inner message data in an Envelope (matches C++ sendBody wrapInEnvelope) + var envelope_data = {}; + envelope_data[msg_info.field_name] = a_msg_data; + var msg_buf = g_envelope_type.encode(envelope_data).finish(); var frame = Buffer.alloc(8); frame.writeUInt32BE(msg_buf.length, 0); - frame.writeUInt8(msg._pid, 4); - frame.writeUInt8(msg._mid, 5); + frame.writeUInt16BE(msg_info.field_id, 4); frame.writeUInt16BE(ctx, 6); g_ctx[ctx] = function (a_reply) { @@ -2121,7 +2090,10 @@ function sendMessage(a_msg_name, a_msg_data, a_req, a_resp, a_cb, a_anon) { sendMessage.name, getCurrentLineNumber(), "MsgType is: " + - msg._msg_type + + msg_info.field_id + + " (" + + a_msg_name + + ")" + " Writing ctx to frame, " + ctx + " buffer size " + @@ -2142,7 +2114,10 @@ function sendMessage(a_msg_name, a_msg_data, a_req, a_resp, a_cb, a_anon) { sendMessage.name, getCurrentLineNumber(), "MsgType is: " + - msg._msg_type + + msg_info.field_id + + " (" + + a_msg_name + + ")" + " Writing ctx to frame, " + ctx + " buffer size " + @@ -2154,17 +2129,19 @@ function sendMessage(a_msg_name, a_msg_data, a_req, a_resp, a_cb, a_anon) { } function sendMessageDirect(a_msg_name, a_client, a_msg_data, a_cb) { - var msg = g_msg_by_name[a_msg_name]; - if (!msg) throw "Invalid message type: " + a_msg_name; + var msg_info = g_msg_by_name[a_msg_name]; + if (!msg_info) throw "Invalid message type: " + a_msg_name; allocRequestContext(null, function (ctx) { - var msg_buf = msg.encode(a_msg_data).finish(); + // Wrap inner message data in an Envelope (matches C++ sendBody wrapInEnvelope) + var envelope_data = {}; + envelope_data[msg_info.field_name] = a_msg_data; + var msg_buf = g_envelope_type.encode(envelope_data).finish(); var frame = Buffer.alloc(8); // A protobuf message doesn't have to have a payload frame.writeUInt32BE(msg_buf.length, 0); - frame.writeUInt8(msg._pid, 4); - frame.writeUInt8(msg._mid, 5); + frame.writeUInt16BE(msg_info.field_id, 4); frame.writeUInt16BE(ctx, 6); g_ctx[ctx] = a_cb; @@ -2187,7 +2164,10 @@ function sendMessageDirect(a_msg_name, a_client, a_msg_data, a_cb) { sendMessageDirect.name, getCurrentLineNumber(), "MsgType is: " + - msg._msg_type + + msg_info.field_id + + " (" + + a_msg_name + + ")" + " Direct Writing ctx to frame, " + ctx + " buffer size " + @@ -2208,7 +2188,10 @@ function sendMessageDirect(a_msg_name, a_client, a_msg_data, a_cb) { sendMessageDirect.name, getCurrentLineNumber(), "MsgType is: " + - msg._msg_type + + msg_info.field_id + + " (" + + a_msg_name + + ")" + " Direct Writing ctx to frame, " + ctx + " buffer size " + @@ -2219,71 +2202,65 @@ function sendMessageDirect(a_msg_name, a_client, a_msg_data, a_cb) { }); } -function processProtoFile(msg) { - //var mlist = msg.parent.order; - var i, - msg_list = []; - for (i in msg.parent.nested) msg_list.push(msg.parent.nested[i]); - - //msg_list.sort(); - - var pid = msg.values.ID; +/** + * Processes the proto3 Envelope message to build message type maps. + * + * Instead of the old proto2 approach that derived message types from Protocol enum IDs + * and file ordering (pid << 8 | mid), this uses the envelope's oneof field numbers + * as stable message type identifiers. + * + * Each map entry stores: + * - type: the protobufjs Type (for encode/decode of the inner message) + * - field_name: the envelope oneof field name (e.g. "version_request") + * - field_id: the envelope field number (used as msg_type in the frame) + * + * @param {protobuf.Root} root - The loaded protobuf root containing SDMS.Envelope + */ +function processEnvelope(root) { + g_envelope_type = root.lookupType("SDMS.Envelope"); + var payloadOneof = g_envelope_type.oneofs.payload; - for (i = 1; i < msg_list.length; i++) { - msg = msg_list[i]; - msg._pid = pid; - msg._mid = i - 1; - msg._msg_type = (pid << 8) | (i - 1); + if (!payloadOneof) throw "Missing 'payload' oneof in SDMS.Envelope"; - g_msg_by_id[msg._msg_type] = msg; - g_msg_by_name[msg.name] = msg; - } -} + payloadOneof.fieldsArray.forEach(function (field) { + var msgType = field.resolvedType; + if (!msgType) { + logger.warning( + processEnvelope.name, + getCurrentLineNumber(), + "Unresolved type for envelope field: " + field.name, + ); + return; + } -protobuf.load("Version.proto", function (err, root) { - if (err) throw err; + var entry = { + type: msgType, // protobufjs Type for encode/decode + field_name: field.name, // envelope oneof field name + field_id: field.id, // envelope field number = msg_type in frame + }; - var msg = root.lookupEnum("Version"); - if (!msg) throw "Missing Version enum in Version.Anon proto file"; - - g_ver_release_year = msg.values.DATAFED_RELEASE_YEAR; - g_ver_release_month = msg.values.DATAFED_RELEASE_MONTH; - g_ver_release_day = msg.values.DATAFED_RELEASE_DAY; - g_ver_release_hour = msg.values.DATAFED_RELEASE_HOUR; - g_ver_release_minute = msg.values.DATAFED_RELEASE_MINUTE; - - g_version = - g_ver_release_year + - "." + - g_ver_release_month + - "." + - g_ver_release_day + - "." + - g_ver_release_hour + - "." + - g_ver_release_minute; - - logger.info("protobuf.load", getCurrentLineNumber(), "Running Version: " + g_version); - if (--g_ready_start == 0) startServer(); -}); + g_msg_by_id[field.id] = entry; + g_msg_by_name[msgType.name] = entry; + }); -protobuf.load("SDMS_Anon.proto", function (err, root) { - if (err) throw err; + logger.info( + processEnvelope.name, + getCurrentLineNumber(), + "Loaded " + Object.keys(g_msg_by_id).length + " message types from envelope", + ); +} - var msg = root.lookupEnum("SDMS.Anon.Protocol"); - if (!msg) throw "Missing Protocol enum in SDMS.Anon proto file"; +var protobufRoot = new protobuf.Root(); - processProtoFile(msg); - if (--g_ready_start == 0) startServer(); -}); +protobufRoot.resolvePath = function (origin, target) { + return "proto3/" + target; +}; -protobuf.load("SDMS_Auth.proto", function (err, root) { +protobufRoot.load("envelope.proto", function (err, root) { if (err) throw err; - var msg = root.lookupEnum("SDMS.Auth.Protocol"); - if (!msg) throw "Missing Protocol enum in SDMS.Auth proto file"; - - processProtoFile(msg); + root.resolveAll(); + processEnvelope(root); if (--g_ready_start == 0) startServer(); }); @@ -2304,20 +2281,34 @@ g_core_sock.on( var mtype = (frame.readUInt8(4) << 8) | frame.readUInt8(5); var ctx = frame.readUInt16BE(6); - var msg_class = g_msg_by_id[mtype]; + var msg_info = g_msg_by_id[mtype]; var msg; + var msg_name = msg_info ? msg_info.type.name : "unknown(" + mtype + ")"; - if (msg_class) { + if (msg_info) { // Only try to decode if there is a payload if (msg_buf && msg_buf.length) { try { - // This is unserializing the protocol message - msg = msg_class.decode(msg_buf); + // Decode as Envelope (matches C++ receiveBody unwrapFromEnvelope) + var envelope = g_envelope_type.decode(msg_buf); + var which_field = envelope.payload; // oneof discriminator: field name that is set + if (which_field) { + msg = envelope[which_field]; + msg_name = which_field; + } else { + logger.warning( + "g_core_sock.on", + getCurrentLineNumber(), + "Envelope decoded but no payload field set, correlation_id: " + + correlation_id, + ); + msg = msg_info.type.create({}); + } if (!msg) { logger.error( "g_core_sock.on", getCurrentLineNumber(), - "ERROR: msg decode failed: no reason, correlation_id: " + + "ERROR: envelope decode produced null msg, correlation_id: " + correlation_id, ); } @@ -2325,11 +2316,15 @@ g_core_sock.on( logger.error( "g_core_sock.on", getCurrentLineNumber(), - "ERROR: msg decode failed: " + err + " correlation_id: " + correlation_id, + "ERROR: envelope decode failed: " + + err + + " correlation_id: " + + correlation_id, ); } } else { - msg = msg_class; + // No payload body - create empty message instance + msg = msg_info.type.create({}); } } else { logger.error( @@ -2345,7 +2340,7 @@ g_core_sock.on( logger.info( "g_core_sock.on", getCurrentLineNumber(), - "freed ctx: " + ctx + " for msg: " + msg_class.name, + "freed ctx: " + ctx + " for msg: " + msg_name, correlation_id, ); g_ctx_next = ctx; @@ -2360,7 +2355,7 @@ g_core_sock.on( " - msg type: " + mtype + ", name: " + - msg_class.name + + msg_name + " correlation_id: " + correlation_id, ); diff --git a/web/docker/Dockerfile b/web/docker/Dockerfile index 5fafb6b08..15cbbc2b1 100644 --- a/web/docker/Dockerfile +++ b/web/docker/Dockerfile @@ -39,7 +39,7 @@ COPY ./scripts/generate_ws_config.sh ${BUILD_DIR}/scripts/ COPY ./scripts/install_ws.sh ${BUILD_DIR}/scripts/ COPY ./scripts/export_dependency_version.sh ${BUILD_DIR}/scripts/ COPY ./cmake ${BUILD_DIR}/cmake -COPY ./common/proto ${BUILD_DIR}/common/proto +COPY ./common/proto3 ${BUILD_DIR}/common/proto3 COPY ./web ${BUILD_DIR}/web RUN ${DATAFED_DEPENDENCIES_ROOT}/scripts/generate_dependencies_config.sh && \ @@ -83,6 +83,7 @@ WORKDIR ${DATAFED_DIR} USER datafed +RUN mkdir -p ${DATAFED_DEFAULT_LOG_PATH}; chown root:datafed ${DATAFED_DEFAULT_LOG_PATH}; chmod -R g+rw ${DATAFED_DEFAULT_LOG_PATH} COPY --from=ws-build --chown=datafed:root ${DATAFED_DEPENDENCIES_ROOT}/scripts/ {DATAFED_DEPENDENCIES_ROOT}/scripts/ COPY --chown=datafed:root ./scripts/generate_datafed.sh ${BUILD_DIR}/scripts/generate_datafed.sh @@ -97,7 +98,7 @@ COPY --from=ws-build --chown=datafed:root ${DATAFED_DEPENDENCIES_ROOT}/scripts $ COPY --from=ws-build --chown=datafed:root ${DATAFED_INSTALL_PATH}/web ${DATAFED_INSTALL_PATH}/web COPY --from=ws-build --chown=datafed:root /usr/bin/curl /usr/bin/curl -WORKDIR ${BUILD_DIR} +WORKDIR ${BUILD_DIR}/web USER root diff --git a/web/static/api.js b/web/static/api.js index 3adf056f5..72af70138 100644 --- a/web/static/api.js +++ b/web/static/api.js @@ -184,17 +184,6 @@ export function dataPutCheck(a_id, a_cb) { _asyncGet("/api/dat/put?id=" + encodeURIComponent(a_id) + "&check=true", null, a_cb); } -export function dataGetDeps(a_ids, a_cb) { - _asyncGet("/api/dat/dep/get?ids=" + encodeURIComponent(a_ids), null, function (ok, data) { - if (ok) { - a_cb(data); - } else { - util.setStatusText("Get Dependencies Error: " + data, true); - a_cb(); - } - }); -} - export function dataGetDepGraph(a_id, a_cb) { _asyncGet("/api/dat/dep/graph/get?id=" + encodeURIComponent(a_id), null, function (ok, data) { if (ok) { @@ -240,25 +229,10 @@ export function sendDataDelete(a_ids, a_cb) { _asyncGet("/api/dat/delete?ids=" + encodeURIComponent(JSON.stringify(a_ids)), null, a_cb); } -export function copyData(a_src_id, a_dst_id, a_cb) { - _asyncGet( - "/api/dat/copy?src=" + - encodeURIComponent(a_src_id) + - "&dst=" + - encodeURIComponent(a_dst_id), - null, - a_cb, - ); -} - export function dataSearch(a_query, a_callback) { _asyncPost("/api/dat/search", a_query, a_callback); } -export function dataPubSearch(a_query, a_cb) { - _asyncPost("/api/col/pub/search/data", a_query, a_cb); -} - export function sendDataLock(a_ids, a_lock, a_cb) { _asyncGet( "/api/dat/lock?lock=" + a_lock + "&ids=" + encodeURIComponent(JSON.stringify(a_ids)), @@ -334,13 +308,6 @@ export function collDelete(a_ids, a_cb) { _asyncGet("/api/col/delete?ids=" + encodeURIComponent(JSON.stringify(a_ids)), null, a_cb); } -export function catalogSearch(a_query, a_cb) { - _asyncPost("/api/cat/search", a_query, a_cb); - /*_asyncPost( "/api/col/pub/search", a_query, function( ok, data ){ - setTimeout( function(){ a_cb( ok, data ); }, 2000 ); - });*/ -} - export function projList_url(a_owned, a_admin, a_member, a_sort, a_offset, a_count) { return ( "/api/prj/list?owner=" + @@ -852,25 +819,6 @@ export function topicListTopics(a_id, a_offset, a_count, a_cb) { if (!a_cb) return; _asyncGet(topicListTopics_url(a_id, a_offset, a_count), null, a_cb); - /*_asyncGet( topicListTopics_url( a_id, a_offset, a_count ), null, function( ok, data ){ - setTimeout( function(){ a_cb( ok, data ); }, 2000 ); - });*/ -} - -export function topicListColl_url(a_id, a_offset, a_count) { - return ( - "/api/top/list/coll?id=" + - a_id + - (a_offset != undefined && a_count != undefined - ? "&offset=" + a_offset + "&count=" + a_count - : "") - ); -} - -export function topicListColl(a_id, a_offset, a_count, a_cb) { - if (!a_cb) return; - - _asyncGet(topicListColl_url(a_id, a_offset, a_count), null, a_cb); } export function topicSearch_url(a_phrase) { diff --git a/web/version.js.in b/web/version.js.in index 69ec65aca..ca3eb3b97 100644 --- a/web/version.js.in +++ b/web/version.js.in @@ -2,5 +2,10 @@ const MAJOR = @DATAFED_WEB_MAJOR@; const MINOR = @DATAFED_WEB_MINOR@; const PATCH = @DATAFED_WEB_PATCH@; +const RELEASE_YEAR = @DATAFED_RELEASE_YEAR@; +const RELEASE_MONTH = @DATAFED_RELEASE_MONTH@; +const RELEASE_DAY = @DATAFED_RELEASE_DAY@; +const RELEASE_HOUR = @DATAFED_RELEASE_HOUR@; +const RELEASE_MINUTE = @DATAFED_RELEASE_MINUTE@; -export default { MAJOR, MINOR, PATCH }; +export default { MAJOR, MINOR, PATCH, RELEASE_YEAR, RELEASE_MONTH, RELEASE_DAY, RELEASE_HOUR, RELEASE_MINUTE }