From 0949f7a909e9b6a04817b0ea6b3132b7646ddc3d Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 9 May 2025 13:38:35 +0200 Subject: [PATCH 01/10] Enforce access token timeout --- .../authentication/AuthenticationService.java | 5 + .../websocket/AbstractWebSocketHandler.java | 305 +++++++++++++++--- .../pulsar/websocket/ConsumerHandler.java | 142 ++++---- .../pulsar/websocket/ProducerHandler.java | 25 +- .../pulsar/websocket/ReaderHandler.java | 17 +- .../pulsar/websocket/data/AuthChallenge.java | 31 ++ .../pulsar/websocket/data/AuthData.java | 32 ++ .../pulsar/websocket/data/AuthResponse.java | 36 +++ .../pulsar/websocket/data/Challenge.java | 31 ++ .../websocket/data/CommandAuthChallenge.java | 32 ++ .../websocket/data/CommandAuthResponse.java | 32 ++ .../pulsar/websocket/data/CommandError.java | 32 ++ .../pulsar/websocket/data/ServerError.java | 59 ++++ .../data/protocol/PulsarWebsocketDecoder.java | 170 ++++++++++ .../websocket/data/protocol/package-info.java | 19 ++ 15 files changed, 815 insertions(+), 153 deletions(-) create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthChallenge.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthData.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthResponse.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/Challenge.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthChallenge.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthResponse.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandError.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ServerError.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoder.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/package-info.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java index e2bf4dcc0156d..0adeb57073887 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.servlet.http.HttpServletRequest; @@ -117,6 +118,10 @@ private AuthenticationProvider getAuthProvider(String authMethodName) throws Aut return providerToUse; } + public Set getAuthMethodNames() { + return providers.keySet(); + } + public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception { String authMethodName = getAuthMethodName(request); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index b6ed27c87b6ba..9310e7694db0d 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -19,6 +19,8 @@ package org.apache.pulsar.websocket; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.EMPTY; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.annotations.VisibleForTesting; @@ -29,7 +31,9 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; @@ -37,7 +41,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; +import org.apache.pulsar.broker.web.AuthenticationFilter; import org.apache.pulsar.client.api.PulsarClientException.AuthenticationException; import org.apache.pulsar.client.api.PulsarClientException.AuthorizationException; import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException; @@ -51,29 +57,44 @@ import org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException; import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException; import org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException; +import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.websocket.data.AuthChallenge; +import org.apache.pulsar.websocket.data.AuthResponse; +import org.apache.pulsar.websocket.data.Challenge; +import org.apache.pulsar.websocket.data.CommandAuthChallenge; +import org.apache.pulsar.websocket.data.CommandAuthResponse; +import org.apache.pulsar.websocket.data.CommandError; import org.apache.pulsar.websocket.data.ConsumerCommand; +import org.apache.pulsar.websocket.data.ServerError; +import org.apache.pulsar.websocket.data.protocol.PulsarWebsocketDecoder; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractWebSocketHandler extends WebSocketAdapter implements Closeable { +public abstract class AbstractWebSocketHandler extends PulsarWebsocketDecoder implements Closeable { protected final WebSocketService service; protected final HttpServletRequest request; protected TopicName topic; protected final Map queryParams; - private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name"; protected final ObjectReader consumerCommandReader = ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class); + private AuthenticationState authState; + private AuthenticationDataSource authData; + private String authRole = null; + private String authMethod = "none"; + private boolean pendingAuthChallengeResponse = false; + private int clientProtocolVersion = 0; + private ScheduledFuture pingFuture; + private ScheduledFuture authRefreshFuture; public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, @@ -88,66 +109,121 @@ public AbstractWebSocketHandler(WebSocketService service, extractTopicName(request); } - protected boolean checkAuth(ServletUpgradeResponse response) { - String authRole = ""; - String authMethodName = request.getHeader(PULSAR_AUTH_METHOD_NAME); - AuthenticationState authenticationState = null; - if (service.isAuthenticationEnabled()) { - try { - if (authMethodName != null - && service.getAuthenticationService().getAuthenticationProvider(authMethodName) != null) { - authenticationState = service.getAuthenticationService() - .getAuthenticationProvider(authMethodName).newHttpAuthState(request); - } - if (authenticationState != null) { - authRole = service.getAuthenticationService() - .authenticateHttpRequest(request, authenticationState.getAuthDataSource()); - } else { - authRole = service.getAuthenticationService().authenticateHttpRequest(request); - } - log.info("[{}:{}] Authenticated WebSocket client {} on topic {}", request.getRemoteAddr(), - request.getRemotePort(), authRole, topic); + private String getAuthMethodName(HttpServletRequest request) { + return request.getHeader(AuthenticationFilter.PULSAR_AUTH_METHOD_NAME); + } + + private boolean checkAuthentication(ServletUpgradeResponse response) { + if (!service.isAuthenticationEnabled()) { + return true; + } + try { + String authMethodNameHeader = getAuthMethodName(request); - } catch (javax.naming.AuthenticationException e) { - log.warn("[{}:{}] Failed to authenticated WebSocket client {} on topic {}: {}", request.getRemoteAddr(), - request.getRemotePort(), authRole, topic, e.getMessage()); + if (authMethodNameHeader != null) { + AuthenticationProvider providerToUse = service.getAuthenticationService() + .getAuthenticationProvider(authMethodNameHeader); try { - response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Failed to authenticate"); - } catch (IOException e1) { - log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), - e1.getMessage(), e1); + AuthenticationState authenticationState = providerToUse.newHttpAuthState(request); + authData = authenticationState.getAuthDataSource(); + authRole = providerToUse.authenticateAsync(authData).get(); + authState = authenticationState; + authMethod = authMethodNameHeader; + return true; + } catch (javax.naming.AuthenticationException e) { + if (log.isDebugEnabled()) { + log.debug("Authentication failed for provider " + authMethodNameHeader + " : " + + e.getMessage(), e); + } + throw e; + } catch (ExecutionException | InterruptedException e) { + if (log.isDebugEnabled()) { + log.debug("Authentication failed for provider " + authMethodNameHeader + " : " + + e.getMessage(), e); + } + throw new RuntimeException(e); + } + } else { + Set authMethodNames = service.getAuthenticationService().getAuthMethodNames(); + for (String authMethodName : authMethodNames) { + try { + AuthenticationProvider provider = service.getAuthenticationService() + .getAuthenticationProvider(authMethodName); + AuthenticationState authenticationState = provider.newHttpAuthState(request); + String authenticationRole = provider + .authenticateAsync(authenticationState.getAuthDataSource()) + .get(); + + authState = authenticationState; + authRole = authenticationRole; + authMethod = authMethodName; + return true; + } catch (ExecutionException | InterruptedException | javax.naming.AuthenticationException e) { + if (log.isDebugEnabled()) { + log.debug("Authentication failed for provider " + authMethodName + ": " + + e.getMessage(), e); + } + // Ignore the exception because we don't know which authentication method is + // expected here. + } } - return false; } + log.info("[{}:{}] Authenticated WebSocket client {} on topic {}", request.getRemoteAddr(), + request.getRemotePort(), authRole, topic); + } catch (javax.naming.AuthenticationException e) { + log.warn("[{}:{}] Failed to authenticated WebSocket client {} on topic {}: {}", request.getRemoteAddr(), + request.getRemotePort(), authRole, topic, e.getMessage()); + try { + response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Failed to authenticate"); + } catch (IOException e1) { + log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), + e1.getMessage(), e1); + } + return false; } + return false; + } - if (service.isAuthorizationEnabled()) { - AuthenticationDataSource authenticationData; - if (authenticationState != null) { - authenticationData = authenticationState.getAuthDataSource(); - } else { - authenticationData = new AuthenticationDataHttps(request); + private boolean checkAuthorization(ServletUpgradeResponse response) { + if (!service.isAuthorizationEnabled()) { + return true; + } + + AuthenticationDataSource authenticationData; + if (authState != null) { + authenticationData = authState.getAuthDataSource(); + } else { + authenticationData = new AuthenticationDataHttps(request); + } + try { + if (isAuthorized(authRole, authenticationData)) { + return true; } + } catch (Exception e) { + log.warn("[{}:{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}", + request.getRemoteAddr(), request.getRemotePort(), authRole, topic, e.getMessage()); try { - if (!isAuthorized(authRole, authenticationData)) { - log.warn("[{}:{}] WebSocket Client [{}] is not authorized on topic {}", request.getRemoteAddr(), - request.getRemotePort(), authRole, topic); - response.sendError(HttpServletResponse.SC_FORBIDDEN, "Not authorized"); - return false; - } - } catch (Exception e) { - log.warn("[{}:{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}", - request.getRemoteAddr(), request.getRemotePort(), authRole, topic, e.getMessage()); - try { - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Server error"); - } catch (IOException e1) { - log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), - e1.getMessage(), e1); - } - return false; + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Server error"); + } catch (IOException e1) { + log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), + e1.getMessage(), e1); } + return false; } - return true; + + try { + log.warn("[{}:{}] WebSocket Client [{}] is not authorized on topic {}", request.getRemoteAddr(), + request.getRemotePort(), authRole, topic); + response.sendError(HttpServletResponse.SC_FORBIDDEN, "Not authorized"); + } catch (IOException e1) { + log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), + e1.getMessage(), e1); + } + return false; + } + + protected boolean checkAuth(ServletUpgradeResponse response) { + return checkAuthentication(response) && checkAuthorization(response); } protected static int getErrorCode(Exception e) { @@ -188,6 +264,27 @@ private void closePingFuture() { } } + private void closeAuthRefreshFuture() { + if (authRefreshFuture != null && !authRefreshFuture.isDone()) { + authRefreshFuture.cancel(true); + } + } + + public static CommandAuthChallenge newAuthChallenge(String authMethodName, AuthData brokerData, + int protocolVersion) { + + CommandAuthChallenge commandAuthChallenge = new CommandAuthChallenge(); + AuthChallenge authChallenge = new AuthChallenge(); + Challenge challenge = new Challenge(authMethodName, authMethodName); + + authChallenge.setProtocolVersion(protocolVersion); + commandAuthChallenge.setAuthChallenge(authChallenge); + + authChallenge.setChallenge(challenge); + + return commandAuthChallenge; + } + @Override public void onWebSocketConnect(Session session) { super.onWebSocketConnect(session); @@ -201,15 +298,112 @@ public void onWebSocketConnect(Session session) { } }, webSocketPingDurationSeconds, webSocketPingDurationSeconds, TimeUnit.SECONDS); } + int authenticationRefreshCheckSeconds = service.getConfig().getAuthenticationRefreshCheckSeconds(); + if (authenticationRefreshCheckSeconds > 0) { + authRefreshFuture = service.getExecutor().scheduleAtFixedRate(() -> { + try { + if (!authState.isExpired()) { + // Credentials are still valid. Nothing to do at this point + return; + } + + if (pendingAuthChallengeResponse) { + log.warn("[{}] Closing connection after timeout on refreshing auth credentials", + session.getRemoteAddress()); + session.close(); + return; + } + + log.info("[{}] Refreshing authentication credentials for authRole {}", + getSession().getRemoteAddress(), this.authRole); + AuthData brokerData = authState.refreshAuthentication(); + CommandAuthChallenge commandAuthChallenge = newAuthChallenge(authMethod, brokerData, + clientProtocolVersion); + String commandAuthChallengeString = objectWriter().writeValueAsString(commandAuthChallenge); + + session.getRemote().sendString(commandAuthChallengeString); + + if (log.isDebugEnabled()) { + log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", + getSession().getRemoteAddress(), authMethod); + } + + pendingAuthChallengeResponse = true; + } catch (javax.naming.AuthenticationException e) { + log.warn("[{}] Failed to refresh authentication: {}", getSession().getRemoteAddress(), e); + session.close(); + } catch (Exception e) { + log.warn("[{}] Websocket Auth refresh general Exception", getSession().getRemoteAddress(), e); + + } + }, authenticationRefreshCheckSeconds, authenticationRefreshCheckSeconds, TimeUnit.SECONDS); + } log.info("[{}] New WebSocket session on topic {}", session.getRemoteAddress(), topic); } + @Override + protected void handleAuthResponse(CommandAuthResponse commandAuthResponse) { + AuthResponse authResponse = commandAuthResponse.getAuthResponse(); + pendingAuthChallengeResponse = false; + + if (log.isDebugEnabled()) { + log.debug("Received AuthResponse from {}, auth method: {}", + getRemote().getInetSocketAddress(), authResponse.getResponse().getAuthMethodName()); + } + + try { + AuthData clientData = AuthData + .of(authResponse.getResponse().getAuthData().getBytes(StandardCharsets.UTF_8)); + doAuthentication(clientData, false, authResponse.getProtocolVersion(), + authResponse.hasClientVersion() ? authResponse.getClientVersion() : EMPTY); + } catch (Exception e) { + log.warn("[{}] Websocket handleAuthResponse general Exception", getSession().getRemoteAddress(), e); + authenticationFailed(e); + } + } + + // According to auth result, send Connected, AuthChallenge, or Error command. + private void doAuthentication(AuthData clientData, + boolean useOriginalAuthState, + int clientProtocolVersion, + final String clientVersion) { + + if (log.isDebugEnabled()) { + log.debug("Authenticate using original auth state : {}, role = {}", useOriginalAuthState, authRole); + } + + authState + .authenticateAsync(clientData) + .whenCompleteAsync((authChallenge, throwable) -> { + if (throwable != null) { + authenticationFailed(throwable); + } + }, service.getExecutor()); + } + + // Handle authentication and authentication refresh failures. Must be called + // from event loop. + private void authenticationFailed(Throwable t) { + try { + CommandError commandError = new CommandError(-1, ServerError.AuthenticationError, "Failed to authenticate"); + String commandErrorString = objectWriter().writeValueAsString(commandError); + getSession().getRemote().sendString(commandErrorString); + } catch (JsonProcessingException e) { + log.error("[{}] Error in sending authentication failure message: {}", + getRemote().getInetSocketAddress(), e); + } catch (IOException e) { + log.error("[{}] Error in sending authentication failure message: {}", + getRemote().getInetSocketAddress(), e); + } + } + @Override public void onWebSocketError(Throwable cause) { super.onWebSocketError(cause); log.info("[{}] WebSocket error on topic {} : {}", getSession().getRemoteAddress(), topic, cause.getMessage()); try { closePingFuture(); + closeAuthRefreshFuture(); close(); } catch (IOException e) { log.error("Failed in closing WebSocket session for topic {} with error: {}", topic, e.getMessage()); @@ -222,6 +416,7 @@ public void onWebSocketClose(int statusCode, String reason) { topic, statusCode, reason); try { closePingFuture(); + closeAuthRefreshFuture(); close(); } catch (IOException e) { log.warn("[{}] Failed to close handler for topic {}. ", getSession().getRemoteAddress(), topic, e); @@ -295,6 +490,10 @@ public ScheduledFuture getPingFuture() { return pingFuture; } + @VisibleForTesting + public ScheduledFuture getAuthRefreshFuture() { + return authRefreshFuture; + } protected abstract Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception; diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index b93c4b215108e..9979b2e7583bd 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -233,31 +233,9 @@ public void onWebSocketConnect(Session session) { } } - @Override - public void onWebSocketText(String message) { - super.onWebSocketText(message); - - try { - ConsumerCommand command = consumerCommandReader.readValue(message); - if ("permit".equals(command.type)) { - handlePermit(command); - } else if ("unsubscribe".equals(command.type)) { - handleUnsubscribe(command); - } else if ("negativeAcknowledge".equals(command.type)) { - handleNack(command); - } else if ("isEndOfTopic".equals(command.type)) { - handleEndOfTopic(); - } else { - handleAck(command); - } - } catch (IOException e) { - log.warn("Failed to deserialize message id: {}", message, e); - close(WebSocketError.FailedToDeserializeFromJSON); - } - } - // Check and notify consumer if reached end of topic. - private void handleEndOfTopic() { + @Override + protected void handleEndOfTopic() { if (log.isDebugEnabled()) { log.debug("[{}/{}] Received check reach the end of topic request from {} ", consumer.getTopic(), subscription, getRemote().getInetSocketAddress().toString()); @@ -288,12 +266,18 @@ public void writeSuccess() { } } - private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException { - if (log.isDebugEnabled()) { - log.debug("[{}/{}] Received unsubscribe request from {} ", consumer.getTopic(), - subscription, getRemote().getInetSocketAddress().toString()); + @Override + protected void handleUnsubscribe(ConsumerCommand command) { + try { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received unsubscribe request from {} ", consumer.getTopic(), + subscription, getRemote().getInetSocketAddress().toString()); + } + consumer.unsubscribe(); + } catch (PulsarClientException e) { + log.warn("Failed to deserialize message id: {}", command, e); + close(WebSocketError.FailedToDeserializeFromJSON); } - consumer.unsubscribe(); } private void checkResumeReceive() { @@ -306,55 +290,73 @@ private void checkResumeReceive() { } } - private void handleAck(ConsumerCommand command) throws IOException { - // We should have received an ack - MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)); - if (log.isDebugEnabled()) { - log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), - subscription, msgId, getRemote().getInetSocketAddress().toString()); - } + @Override + protected void handleAck(ConsumerCommand command) { + try { + // We should have received an ack + MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)); + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), + subscription, msgId, getRemote().getInetSocketAddress().toString()); + } - MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId); - if (originalMsgId != null) { - consumer.acknowledgeAsync(originalMsgId).thenAccept(consumer -> numMsgsAcked.increment()); - } else { - consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment()); - } + MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId); + if (originalMsgId != null) { + consumer.acknowledgeAsync(originalMsgId).thenAccept(consumer -> numMsgsAcked.increment()); + } else { + consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment()); + } - checkResumeReceive(); + checkResumeReceive(); + } catch (IOException e) { + log.warn("Failed to deserialize message id: {}", command, e); + close(WebSocketError.FailedToDeserializeFromJSON); + } } - private void handleNack(ConsumerCommand command) throws IOException { - MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), - topic.toString()); - if (log.isDebugEnabled()) { - log.debug("[{}/{}] Received negative ack request of message {} from {} ", consumer.getTopic(), - subscription, msgId, getRemote().getInetSocketAddress().toString()); - } + @Override + protected void handleNack(ConsumerCommand command) { + try { + MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), + topic.toString()); + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received negative ack request of message {} from {} ", consumer.getTopic(), + subscription, msgId, getRemote().getInetSocketAddress().toString()); + } - MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId); - if (originalMsgId != null) { - consumer.negativeAcknowledge(originalMsgId); - } else { - consumer.negativeAcknowledge(msgId); + MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId); + if (originalMsgId != null) { + consumer.negativeAcknowledge(originalMsgId); + } else { + consumer.negativeAcknowledge(msgId); + } + checkResumeReceive(); + } catch (IOException e) { + log.warn("Failed to deserialize message id: {}", command, e); + close(WebSocketError.FailedToDeserializeFromJSON); } - checkResumeReceive(); } - private void handlePermit(ConsumerCommand command) throws IOException { - if (log.isDebugEnabled()) { - log.debug("[{}/{}] Received {} permits request from {} ", consumer.getTopic(), - subscription, command.permitMessages, getRemote().getInetSocketAddress().toString()); - } - if (command.permitMessages == null) { - throw new IOException("Missing required permitMessages field for 'permit' command"); - } - if (this.pullMode) { - int pending = pendingMessages.getAndAdd(-command.permitMessages); - if (pending >= 0) { - // Resume delivery - receiveMessage(); + @Override + protected void handlePermit(ConsumerCommand command) { + try { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] Received {} permits request from {} ", consumer.getTopic(), + subscription, command.permitMessages, getRemote().getInetSocketAddress().toString()); + } + if (command.permitMessages == null) { + throw new IOException("Missing required permitMessages field for 'permit' command"); + } + if (this.pullMode) { + int pending = pendingMessages.getAndAdd(-command.permitMessages); + if (pending >= 0) { + // Resume delivery + receiveMessage(); + } } + } catch (IOException e) { + log.warn("Failed to deserialize message id: {}", command, e); + close(WebSocketError.FailedToDeserializeFromJSON); } } @@ -528,4 +530,4 @@ public String extractSubscription(HttpServletRequest request) { } private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class); -} +} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index 3c0f42935e6bb..2fac8155a8755 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -21,13 +21,10 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; -import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON; import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError; import static org.apache.pulsar.websocket.WebSocketError.UnknownError; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectReader; import com.google.common.base.Enums; import java.io.IOException; import java.time.format.DateTimeParseException; @@ -54,6 +51,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; +import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey; import org.apache.pulsar.common.api.proto.KeyValue; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.DateFormatter; @@ -91,8 +89,6 @@ public class ProducerHandler extends AbstractWebSocketHandler { public static final List ENTRY_LATENCY_BUCKETS_USEC = Collections.unmodifiableList(Arrays.asList( 500L, 1_000L, 5_000L, 10_000L, 20_000L, 50_000L, 100_000L, 200_000L, 1000_000L)); - private final ObjectReader producerMessageReader = - ObjectMapperFactory.getMapper().reader().forType(ProducerMessage.class); public ProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { super(service, request, response); @@ -156,29 +152,26 @@ public void close() throws IOException { } @Override - public void onWebSocketText(String message) { + protected void handleMessage(ProducerMessage sendRequest) { if (log.isDebugEnabled()) { log.debug("[{}] Received new message from producer {} ", producer.getTopic(), getRemote().getInetSocketAddress().toString()); } - ProducerMessage sendRequest; byte[] rawPayload = null; String requestContext = null; try { - sendRequest = producerMessageReader.readValue(message); requestContext = sendRequest.context; + if (sendRequest.payload == null) { + // Null payload + sendAckResponse(new ProducerAck(PayloadEncodingError, "Empty payload", null, + requestContext)); + return; + } rawPayload = Base64.getDecoder().decode(sendRequest.payload); - } catch (IOException e) { - sendAckResponse(new ProducerAck(FailedToDeserializeFromJSON, e.getMessage(), null, null)); - return; } catch (IllegalArgumentException e) { String msg = format("Invalid Base64 message-payload error=%s", e.getMessage()); sendAckResponse(new ProducerAck(PayloadEncodingError, msg, null, requestContext)); return; - } catch (NullPointerException e) { - // Null payload - sendAckResponse(new ProducerAck(PayloadEncodingError, e.getMessage(), null, requestContext)); - return; } final long msgSize = rawPayload.length; @@ -532,4 +525,4 @@ private void printLogIfSettingDiscardedCompressionParams() { private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class); -} +} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index 2f985b2076da2..2c007c925551d 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -43,7 +43,6 @@ import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.DateFormatter; -import org.apache.pulsar.websocket.data.ConsumerCommand; import org.apache.pulsar.websocket.data.ConsumerMessage; import org.apache.pulsar.websocket.data.EndOfTopicResponse; import org.eclipse.jetty.websocket.api.Session; @@ -210,17 +209,6 @@ public void onWebSocketConnect(Session session) { public void onWebSocketText(String message) { super.onWebSocketText(message); - try { - ConsumerCommand command = consumerCommandReader.readValue(message); - if ("isEndOfTopic".equals(command.type)) { - handleEndOfTopic(); - return; - } - } catch (IOException e) { - log.warn("Failed to deserialize message id: {}", message, e); - close(WebSocketError.FailedToDeserializeFromJSON); - } - // We should have received an ack // but reader doesn't send an ack to broker here because already reader did @@ -232,7 +220,8 @@ public void onWebSocketText(String message) { } // Check and notify reader if reached end of topic. - private void handleEndOfTopic() { + @Override + protected void handleEndOfTopic() { try { String msg = objectWriter().writeValueAsString( new EndOfTopicResponse(reader.hasReachedEndOfTopic())); @@ -353,4 +342,4 @@ private MessageId getMessageId() throws IOException { private static final Logger log = LoggerFactory.getLogger(ReaderHandler.class); -} +} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthChallenge.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthChallenge.java new file mode 100644 index 0000000000000..87bc73f360483 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthChallenge.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class AuthChallenge { + public Challenge challenge; + public int protocolVersion; +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthData.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthData.java new file mode 100644 index 0000000000000..cd5ff4ac90237 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthData.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class AuthData { + public String authMethodName; + public String authData; + +} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthResponse.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthResponse.java new file mode 100644 index 0000000000000..5243490dced8b --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/AuthResponse.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class AuthResponse { + public String clientVersion; + public AuthData response; + public int protocolVersion; + + public boolean hasClientVersion() { + return clientVersion != null && !clientVersion.isEmpty(); + } +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/Challenge.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/Challenge.java new file mode 100644 index 0000000000000..598318a90a6af --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/Challenge.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Challenge { + public String authMethodName; + public String authData; +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthChallenge.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthChallenge.java new file mode 100644 index 0000000000000..82fcee6f13efa --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthChallenge.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class CommandAuthChallenge { + public String type = "AUTH_CHALLENGE"; + public AuthChallenge authChallenge; + +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthResponse.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthResponse.java new file mode 100644 index 0000000000000..9b930a0cacd0a --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandAuthResponse.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class CommandAuthResponse { + public String type; + public AuthResponse authResponse; + +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandError.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandError.java new file mode 100644 index 0000000000000..814e64126adb4 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/CommandError.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class CommandError { + public long requestId; + public ServerError error; + public String message; +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ServerError.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ServerError.java new file mode 100644 index 0000000000000..06ae2d1d845f4 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ServerError.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data; + +public enum ServerError { + UnknownError, + MetadataError, // Error with ZK/metadata + PersistenceError, // Error writing reading from BK + AuthenticationError, // Non valid authentication + AuthorizationError, // Not authorized to use resource + + ConsumerBusy, // Unable to subscribe/unsubscribe because + // other consumers are connected + ServiceNotReady, // Any error that requires client retry operation with a fresh lookup + ProducerBlockedQuotaExceededError, // Unable to create producer because backlog quota exceeded + ProducerBlockedQuotaExceededException, // Exception while creating producer because quota exceeded + ChecksumError, // Error while verifying message checksum + UnsupportedVersionError, // Error when an older client/version doesn't support a required feature + TopicNotFound, // Topic not found + SubscriptionNotFound, // Subscription not found + ConsumerNotFound, // Consumer not found + TooManyRequests, // Error with too many simultaneously request + TopicTerminatedError, // The topic has been terminated + + ProducerBusy, // Producer with same name is already connected + InvalidTopicName, // The topic name is not valid + + IncompatibleSchema, // Specified schema was incompatible with topic schema + ConsumerAssignError, // Dispatcher assign consumer error + + TransactionCoordinatorNotFound, // Transaction coordinator not found error + InvalidTxnStatus, // Invalid txn status error + NotAllowedError, // Not allowed error + + TransactionConflict, // Ack with transaction conflict + TransactionNotFound, // Transaction not found + + ProducerFenced // When a producer asks and fail to get exclusive producer access, + // or loses the eclusive status after a reconnection, the broker will + // use this error to indicate that this producer is now permanently + // fenced. Applications are now supposed to close it and create a + // new producer +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoder.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoder.java new file mode 100644 index 0000000000000..6132b0ad690fd --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoder.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data.protocol; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import org.apache.pulsar.common.protocol.PulsarDecoder; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.websocket.data.CommandAuthChallenge; +import org.apache.pulsar.websocket.data.CommandAuthResponse; +import org.apache.pulsar.websocket.data.ConsumerCommand; +import org.apache.pulsar.websocket.data.ProducerMessage; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.WriteCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class PulsarWebsocketDecoder extends WebSocketAdapter { + + private final ObjectReader commandAuthChallengeReader = ObjectMapperFactory.getMapper().reader() + .forType(CommandAuthChallenge.class); + private final ObjectReader commandAuthResponseReader = ObjectMapperFactory.getMapper().reader() + .forType(CommandAuthResponse.class); + private final ObjectReader producerMessageReader = ObjectMapperFactory.getMapper().reader() + .forType(ProducerMessage.class); + protected final ObjectReader consumerCommandReader = ObjectMapperFactory.getMapper().reader() + .forType(ConsumerCommand.class); + private String remoteAddress = null; + + private String getRemoteAddress() { + if (getSession() != null && getSession().getRemoteAddress() != null) { + remoteAddress = getSession().getRemoteAddress().toString(); + } + return remoteAddress; + } + + @Override + public void onWebSocketText(String msg) { + try { + if (log.isDebugEnabled()){ + log.debug("[{}] Message received: {}", getRemoteAddress(), msg); + } + ObjectMapper mapper = new ObjectMapper(); + JsonNode json = mapper.readTree(msg); + + if (json.has("type")) { + + switch (json.get("type").asText()) { + case "AUTH_CHALLENGE": + handleAuthChallenge(commandAuthChallengeReader.readValue(msg)); + break; + case "AUTH_RESPONSE": + handleAuthResponse(commandAuthResponseReader.readValue(msg)); + break; + case "permit": + handlePermit(consumerCommandReader.readValue(msg)); + break; + case "unsubscribe": + handleUnsubscribe(consumerCommandReader.readValue(msg)); + break; + case "negativeAcknowledge": + handleNack(consumerCommandReader.readValue(msg)); + break; + case "isEndOfTopic": + handleEndOfTopic(); + break; + default: + handleAck(consumerCommandReader.readValue(msg)); + + } + } else if (json.has("messageId")) { + handleAck(consumerCommandReader.readValue(msg)); + } else { + handleMessage(producerMessageReader.readValue(msg)); + } + } catch (JsonMappingException e) { + log.warn("[{}] Message received JsonMappingException {}", getRemoteAddress(), + e); + } catch (JsonProcessingException e) { + log.warn("[{}] Message received JsonProcessingException {}", getRemoteAddress(), + e); + } catch (Exception e) { + log.warn("[{}] Message received Exception {}", getRemoteAddress(), + e); + } + } + + protected void handleMessage(ProducerMessage cmdMessage) { + throw new UnsupportedOperationException(); + } + + protected void handleAuthResponse(CommandAuthResponse commandAuthResponse) { + throw new UnsupportedOperationException(); + } + + protected void handleAuthChallenge(CommandAuthChallenge commandAuthChallenge) { + throw new UnsupportedOperationException(); + } + + protected void handlePermit(ConsumerCommand command) { + throw new UnsupportedOperationException(); + } + + protected void handleUnsubscribe(ConsumerCommand command) { + throw new UnsupportedOperationException(); + } + + protected void handleNack(ConsumerCommand command) { + throw new UnsupportedOperationException(); + } + + protected void handleEndOfTopic() { + throw new UnsupportedOperationException(); + } + + protected void handleAck(ConsumerCommand command) { + throw new UnsupportedOperationException(); + } + + private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class); + + private void writeAndFlush(ByteBuf cmd) { + + try { + getRemote().sendBytes(cmd.nioBuffer()); + } catch (IOException e) { + log.warn("[{}] Unable to send command {}", getRemoteAddress(), e); + } + } + + public void writeAndFlushWithClosePromise(ByteBuf cmd) { + getRemote().sendBytes(cmd.nioBuffer(), new WriteCallback() { + + @Override + public void writeFailed(Throwable x) { + + getSession().close(); + + } + + @Override + public void writeSuccess() { + getSession().close(); + } + + }); + } + +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/package-info.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/package-info.java new file mode 100644 index 0000000000000..60247eb3fceb1 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/protocol/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data.protocol; From d092b88b315d60c1c108e7642e233fe5ca899632 Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 9 May 2025 16:24:06 +0200 Subject: [PATCH 02/10] Added PulsarWebsocketDecoderTest --- .../protocol/PulsarWebsocketDecoderTest.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java new file mode 100644 index 0000000000000..77565e1489b44 --- /dev/null +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket.data.protocol; + +import org.apache.pulsar.websocket.data.CommandAuthResponse; +import org.testng.Assert; +import org.testng.annotations.Test; + +import lombok.Getter; + +public class PulsarWebsocketDecoderTest { + + static class MockedPulsarWebsocketDecoder extends PulsarWebsocketDecoder { + + @Getter + private CommandAuthResponse commandAuthResponse; + + @Override + public void handleAuthResponse(CommandAuthResponse commandAuthResponse) { + this.commandAuthResponse = commandAuthResponse; + } + } + + @Test + public void testHandleAuthResponseWithoutClientVersion() throws Exception { + String message = "{\"type\" : \"AUTH_RESPONSE\", \"authResponse\" : {\"clientVersion\" : \"v21\", \"protocolVersion\" : 21, \"response\" : {\"authMethodName\":\"token\", \"authData\": \"NewAccessToken\"}}}"; + + MockedPulsarWebsocketDecoder websocketDecoder = new MockedPulsarWebsocketDecoder(); + + websocketDecoder.onWebSocketText(message); + Assert.assertEquals( + websocketDecoder.getCommandAuthResponse().getAuthResponse().getResponse().getAuthMethodName(), "token"); + Assert.assertEquals(websocketDecoder.getCommandAuthResponse().getAuthResponse().getResponse().getAuthData(), + "NewAccessToken"); + } +} From d165cdfb2232ecba3110a086780cfb83e67f9793 Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Wed, 28 May 2025 10:57:33 +0200 Subject: [PATCH 03/10] Added test --- .../service/WebSocketProxyConfiguration.java | 4 ++ .../AbstractWebSocketHandlerTest.java | 65 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index 31a1adc291553..5d57a940aa21d 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -185,6 +185,10 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { @FieldContext(doc = "Interval of time to sending the ping to keep alive. This value greater than 0 means enabled") private int webSocketPingDurationSeconds = -1; + @FieldContext(doc = "Interval of time to check if the access token is still valid." + + " This value greater than 0 means enabled") + private int authenticationRefreshCheckSeconds = -1; + @FieldContext(doc = "When this parameter is not empty, unauthenticated users perform as anonymousUserRole") private String anonymousUserRole = null; diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java index d21e1176f571d..b03e9406fd5b8 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java @@ -19,13 +19,20 @@ package org.apache.pulsar.websocket; import static com.google.common.base.Preconditions.checkArgument; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.after; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.net.InetSocketAddress; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -39,9 +46,14 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import lombok.Cleanup; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import lombok.Getter; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.HashingScheme; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -463,4 +475,57 @@ public void testPingFuture() throws IOException { webSocketHandler.onWebSocketError(new RuntimeException("INTERNAL_SERVER_ERROR_500")); assertTrue(pingFuture.isDone()); } + + @Test + public void testDropSessionAfterTokenExpiration() + throws IOException, IllegalArgumentException, IllegalAccessException, NoSuchFieldException, + SecurityException, InstantiationException, InvocationTargetException, NoSuchMethodException { + + WebSocketProxyConfiguration webSocketProxyConfiguration = new WebSocketProxyConfiguration(); + webSocketProxyConfiguration.setAuthenticationEnabled(true); + webSocketProxyConfiguration.setAuthenticationRefreshCheckSeconds(1); + + WebSocketService webSocketService = spy(new WebSocketService(webSocketProxyConfiguration)); + + HttpServletRequest httpServletRequest = mock(HttpServletRequest.class); + String consumerV2 = "/ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription"; + when(httpServletRequest.getRequestURI()).thenReturn(consumerV2); + + MockedServletUpgradeResponse response = new MockedServletUpgradeResponse(null); + AbstractWebSocketHandler webSocketHandler = new WebSocketHandlerImpl(webSocketService, httpServletRequest, + response); + AuthenticationState authState = mock(AuthenticationState.class); + when(authState.isExpired()).thenReturn(false).thenReturn(false).thenReturn(true); + + Class webSocketHandlerClass = AbstractWebSocketHandler.class; + Field field = webSocketHandlerClass.getDeclaredField("authState"); + field.setAccessible(true); + field.set(webSocketHandler, authState); + + Session session = mock(Session.class); + RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class); + when(session.getRemote()).thenReturn(remoteEndpoint); + when(session.getRemoteAddress()).thenReturn(InetSocketAddress.createUnresolved("localhost", 20)); + + webSocketHandler.onWebSocketConnect(session); + + verify(session, after(8000).atLeast(1)).close(); + verify(remoteEndpoint, after(5000)).sendString(argThat(x -> { + try { + assertNotNull(x); + ObjectMapper mapper = new ObjectMapper(); + JsonNode json = mapper.readTree(x); + + assertNotNull(json); + assertTrue(json.has("type")); + assertEquals(json.get("type").asText(), "AUTH_CHALLENGE"); + return true; + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + return false; + })); + } } From f55ce6265a06d16c9ea1157b2a533edf40567e56 Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 4 Jul 2025 10:15:15 +0200 Subject: [PATCH 04/10] Fixed authentication error response --- .../pulsar/websocket/AbstractWebSocketHandler.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index 9310e7694db0d..dd7b7db618b5f 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -173,13 +173,12 @@ private boolean checkAuthentication(ServletUpgradeResponse response) { } catch (javax.naming.AuthenticationException e) { log.warn("[{}:{}] Failed to authenticated WebSocket client {} on topic {}: {}", request.getRemoteAddr(), request.getRemotePort(), authRole, topic, e.getMessage()); - try { - response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Failed to authenticate"); - } catch (IOException e1) { - log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), - e1.getMessage(), e1); - } - return false; + } + try { + response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Failed to authenticate"); + } catch (IOException e1) { + log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), + e1.getMessage(), e1); } return false; } From a072be7fbc5be8a8e37be55874b0a733e18d14bb Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 4 Jul 2025 16:08:13 +0200 Subject: [PATCH 05/10] Added option for Anonymous users --- .../pulsar/websocket/AbstractWebSocketHandler.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index dd7b7db618b5f..ab8d43c53c33d 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -85,6 +85,7 @@ public abstract class AbstractWebSocketHandler extends PulsarWebsocketDecoder im protected final Map queryParams; protected final ObjectReader consumerCommandReader = ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class); + private final String anonymousUserRole; private AuthenticationState authState; private AuthenticationDataSource authData; @@ -101,6 +102,7 @@ public AbstractWebSocketHandler(WebSocketService service, ServletUpgradeResponse response) { this.service = service; this.request = new WebSocketHttpServletRequestWrapper(request); + this.anonymousUserRole = service.getConfig().getAnonymousUserRole(); this.queryParams = new TreeMap<>(); request.getParameterMap().forEach((key, values) -> { @@ -167,6 +169,14 @@ private boolean checkAuthentication(ServletUpgradeResponse response) { // expected here. } } + + if (StringUtils.isNotBlank(anonymousUserRole)) { + authRole = anonymousUserRole; + if (log.isDebugEnabled()) { + log.debug("Anonymous authentication succeded"); + } + return true; + } } log.info("[{}:{}] Authenticated WebSocket client {} on topic {}", request.getRemoteAddr(), request.getRemotePort(), authRole, topic); From 3ced33bbcf10e24f8fb42988aa1a7932a4eae86a Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 4 Jul 2025 16:39:33 +0200 Subject: [PATCH 06/10] Fixed bug in case of missing config --- .../pulsar/websocket/AbstractWebSocketHandler.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index ab8d43c53c33d..4673432a1fbeb 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -31,6 +31,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ExecutionException; @@ -85,7 +86,7 @@ public abstract class AbstractWebSocketHandler extends PulsarWebsocketDecoder im protected final Map queryParams; protected final ObjectReader consumerCommandReader = ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class); - private final String anonymousUserRole; + private final Optional anonymousUserRole; private AuthenticationState authState; private AuthenticationDataSource authData; @@ -102,7 +103,7 @@ public AbstractWebSocketHandler(WebSocketService service, ServletUpgradeResponse response) { this.service = service; this.request = new WebSocketHttpServletRequestWrapper(request); - this.anonymousUserRole = service.getConfig().getAnonymousUserRole(); + this.anonymousUserRole = service.getAuthenticationService().getAnonymousUserRole(); this.queryParams = new TreeMap<>(); request.getParameterMap().forEach((key, values) -> { @@ -170,8 +171,8 @@ private boolean checkAuthentication(ServletUpgradeResponse response) { } } - if (StringUtils.isNotBlank(anonymousUserRole)) { - authRole = anonymousUserRole; + if (StringUtils.isNotBlank(anonymousUserRole.orElse(""))) { + authRole = anonymousUserRole.get(); if (log.isDebugEnabled()) { log.debug("Anonymous authentication succeded"); } From 3fafc86ee2bb8b79ac4ce8483808ee052475197f Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 4 Jul 2025 17:06:24 +0200 Subject: [PATCH 07/10] FIxed bug in case of missing AuthenticationService --- .../apache/pulsar/websocket/AbstractWebSocketHandler.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index 4673432a1fbeb..fbb409868e45c 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -86,7 +86,6 @@ public abstract class AbstractWebSocketHandler extends PulsarWebsocketDecoder im protected final Map queryParams; protected final ObjectReader consumerCommandReader = ObjectMapperFactory.getMapper().reader().forType(ConsumerCommand.class); - private final Optional anonymousUserRole; private AuthenticationState authState; private AuthenticationDataSource authData; @@ -103,7 +102,6 @@ public AbstractWebSocketHandler(WebSocketService service, ServletUpgradeResponse response) { this.service = service; this.request = new WebSocketHttpServletRequestWrapper(request); - this.anonymousUserRole = service.getAuthenticationService().getAnonymousUserRole(); this.queryParams = new TreeMap<>(); request.getParameterMap().forEach((key, values) -> { @@ -171,7 +169,8 @@ private boolean checkAuthentication(ServletUpgradeResponse response) { } } - if (StringUtils.isNotBlank(anonymousUserRole.orElse(""))) { + Optional anonymousUserRole = service.getAuthenticationService().getAnonymousUserRole(); + if (anonymousUserRole.isPresent() && StringUtils.isNotBlank(anonymousUserRole.get())) { authRole = anonymousUserRole.get(); if (log.isDebugEnabled()) { log.debug("Anonymous authentication succeded"); From 1265de41949c774cda74eb0a7ffc5908d06973d7 Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 25 Jul 2025 14:39:05 +0200 Subject: [PATCH 08/10] Order imports alphabetically --- .../apache/pulsar/websocket/AbstractWebSocketHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java index b03e9406fd5b8..8ca82737e1150 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java @@ -29,6 +29,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import com.google.common.base.Splitter; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -50,7 +51,6 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Splitter; import lombok.Getter; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationState; From 52c65b036e400d6dede8590f4987c77d24c8a7b1 Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 25 Jul 2025 14:42:43 +0200 Subject: [PATCH 09/10] Order imports alphabetically --- .../pulsar/websocket/AbstractWebSocketHandlerTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java index 8ca82737e1150..7ffdb61a225df 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java @@ -29,6 +29,10 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import java.io.IOException; import java.lang.reflect.Field; @@ -47,10 +51,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import lombok.Cleanup; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationState; From 5f16532fb3583f8014768a909916e950027d37bc Mon Sep 17 00:00:00 2001 From: Huibert Alblas Date: Fri, 25 Jul 2025 15:33:26 +0200 Subject: [PATCH 10/10] Fixed stylecheck issues --- .../websocket/AbstractWebSocketHandlerTest.java | 1 - .../data/protocol/PulsarWebsocketDecoderTest.java | 11 +++++++---- .../src/test/resources/AUTH_RESPONSE.json | 11 +++++++++++ 3 files changed, 18 insertions(+), 5 deletions(-) create mode 100644 pulsar-websocket/src/test/resources/AUTH_RESPONSE.json diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java index 59862a3f1bf79..73d68a86bfef8 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java @@ -489,7 +489,6 @@ public void testDropSessionAfterTokenExpiration() WebSocketProxyConfiguration webSocketProxyConfiguration = new WebSocketProxyConfiguration(); webSocketProxyConfiguration.setAuthenticationEnabled(true); webSocketProxyConfiguration.setAuthenticationRefreshCheckSeconds(1); - WebSocketService webSocketService = spy(new WebSocketService(webSocketProxyConfiguration)); HttpServletRequest httpServletRequest = mock(HttpServletRequest.class); diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java index 77565e1489b44..5f905f981906e 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/data/protocol/PulsarWebsocketDecoderTest.java @@ -18,12 +18,15 @@ */ package org.apache.pulsar.websocket.data.protocol; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import lombok.Getter; import org.apache.pulsar.websocket.data.CommandAuthResponse; import org.testng.Assert; import org.testng.annotations.Test; -import lombok.Getter; - public class PulsarWebsocketDecoderTest { static class MockedPulsarWebsocketDecoder extends PulsarWebsocketDecoder { @@ -39,8 +42,8 @@ public void handleAuthResponse(CommandAuthResponse commandAuthResponse) { @Test public void testHandleAuthResponseWithoutClientVersion() throws Exception { - String message = "{\"type\" : \"AUTH_RESPONSE\", \"authResponse\" : {\"clientVersion\" : \"v21\", \"protocolVersion\" : 21, \"response\" : {\"authMethodName\":\"token\", \"authData\": \"NewAccessToken\"}}}"; - + Path messagePath = Paths.get(this.getClass().getClassLoader().getResource("AUTH_RESPONSE.json").toURI()); + String message = Files.readString(messagePath, StandardCharsets.UTF_8); MockedPulsarWebsocketDecoder websocketDecoder = new MockedPulsarWebsocketDecoder(); websocketDecoder.onWebSocketText(message); diff --git a/pulsar-websocket/src/test/resources/AUTH_RESPONSE.json b/pulsar-websocket/src/test/resources/AUTH_RESPONSE.json new file mode 100644 index 0000000000000..1dd0bcdac9abe --- /dev/null +++ b/pulsar-websocket/src/test/resources/AUTH_RESPONSE.json @@ -0,0 +1,11 @@ +{ + "type": "AUTH_RESPONSE", + "authResponse": { + "clientVersion": "v21", + "protocolVersion": 21, + "response": { + "authMethodName": "token", + "authData": "NewAccessToken" + } + } +} \ No newline at end of file