Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -117,6 +118,10 @@ private AuthenticationProvider getAuthProvider(String authMethodName) throws Aut
return providerToUse;
}

public Set<String> getAuthMethodNames() {
return providers.keySet();
}

public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response)
throws Exception {
String authMethodName = getAuthMethodName(request);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -233,31 +233,9 @@ public void onWebSocketConnect(Session session) {
}
}

@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);

try {
ConsumerCommand command = consumerCommandReader.readValue(message);
if ("permit".equals(command.type)) {
handlePermit(command);
} else if ("unsubscribe".equals(command.type)) {
handleUnsubscribe(command);
} else if ("negativeAcknowledge".equals(command.type)) {
handleNack(command);
} else if ("isEndOfTopic".equals(command.type)) {
handleEndOfTopic();
} else {
handleAck(command);
}
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}
}

// Check and notify consumer if reached end of topic.
private void handleEndOfTopic() {
@Override
protected void handleEndOfTopic() {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received check reach the end of topic request from {} ", consumer.getTopic(),
subscription, getRemote().getInetSocketAddress().toString());
Expand Down Expand Up @@ -288,12 +266,18 @@ public void writeSuccess() {
}
}

private void handleUnsubscribe(ConsumerCommand command) throws PulsarClientException {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received unsubscribe request from {} ", consumer.getTopic(),
subscription, getRemote().getInetSocketAddress().toString());
@Override
protected void handleUnsubscribe(ConsumerCommand command) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received unsubscribe request from {} ", consumer.getTopic(),
subscription, getRemote().getInetSocketAddress().toString());
}
consumer.unsubscribe();
} catch (PulsarClientException e) {
log.warn("Failed to deserialize message id: {}", command, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}
consumer.unsubscribe();
}

private void checkResumeReceive() {
Expand All @@ -306,55 +290,73 @@ private void checkResumeReceive() {
}
}

private void handleAck(ConsumerCommand command) throws IOException {
// We should have received an ack
MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId));
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
}
@Override
protected void handleAck(ConsumerCommand command) {
try {
// We should have received an ack
MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId));
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
}

MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId);
if (originalMsgId != null) {
consumer.acknowledgeAsync(originalMsgId).thenAccept(consumer -> numMsgsAcked.increment());
} else {
consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
}
MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId);
if (originalMsgId != null) {
consumer.acknowledgeAsync(originalMsgId).thenAccept(consumer -> numMsgsAcked.increment());
} else {
consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
}

checkResumeReceive();
checkResumeReceive();
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", command, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}
}

private void handleNack(ConsumerCommand command) throws IOException {
MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
topic.toString());
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received negative ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
}
@Override
protected void handleNack(ConsumerCommand command) {
try {
MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
topic.toString());
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received negative ack request of message {} from {} ", consumer.getTopic(),
subscription, msgId, getRemote().getInetSocketAddress().toString());
}

MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId);
if (originalMsgId != null) {
consumer.negativeAcknowledge(originalMsgId);
} else {
consumer.negativeAcknowledge(msgId);
MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId);
if (originalMsgId != null) {
consumer.negativeAcknowledge(originalMsgId);
} else {
consumer.negativeAcknowledge(msgId);
}
checkResumeReceive();
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", command, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}
checkResumeReceive();
}

private void handlePermit(ConsumerCommand command) throws IOException {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received {} permits request from {} ", consumer.getTopic(),
subscription, command.permitMessages, getRemote().getInetSocketAddress().toString());
}
if (command.permitMessages == null) {
throw new IOException("Missing required permitMessages field for 'permit' command");
}
if (this.pullMode) {
int pending = pendingMessages.getAndAdd(-command.permitMessages);
if (pending >= 0) {
// Resume delivery
receiveMessage();
@Override
protected void handlePermit(ConsumerCommand command) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}/{}] Received {} permits request from {} ", consumer.getTopic(),
subscription, command.permitMessages, getRemote().getInetSocketAddress().toString());
}
if (command.permitMessages == null) {
throw new IOException("Missing required permitMessages field for 'permit' command");
}
if (this.pullMode) {
int pending = pendingMessages.getAndAdd(-command.permitMessages);
if (pending >= 0) {
// Resume delivery
receiveMessage();
}
}
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", command, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}
}

Expand Down Expand Up @@ -528,4 +530,4 @@ public String extractSubscription(HttpServletRequest request) {
}

private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import static org.apache.pulsar.websocket.WebSocketError.FailedToDeserializeFromJSON;
import static org.apache.pulsar.websocket.WebSocketError.PayloadEncodingError;
import static org.apache.pulsar.websocket.WebSocketError.UnknownError;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.base.Enums;
import java.io.IOException;
import java.time.format.DateTimeParseException;
Expand All @@ -54,6 +51,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -91,8 +89,6 @@ public class ProducerHandler extends AbstractWebSocketHandler {

public static final List<Long> ENTRY_LATENCY_BUCKETS_USEC = Collections.unmodifiableList(Arrays.asList(
500L, 1_000L, 5_000L, 10_000L, 20_000L, 50_000L, 100_000L, 200_000L, 1000_000L));
private final ObjectReader producerMessageReader =
ObjectMapperFactory.getMapper().reader().forType(ProducerMessage.class);

public ProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
Expand Down Expand Up @@ -156,29 +152,26 @@ public void close() throws IOException {
}

@Override
public void onWebSocketText(String message) {
protected void handleMessage(ProducerMessage sendRequest) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received new message from producer {} ", producer.getTopic(),
getRemote().getInetSocketAddress().toString());
}
ProducerMessage sendRequest;
byte[] rawPayload = null;
String requestContext = null;
try {
sendRequest = producerMessageReader.readValue(message);
requestContext = sendRequest.context;
if (sendRequest.payload == null) {
// Null payload
sendAckResponse(new ProducerAck(PayloadEncodingError, "Empty payload", null,
requestContext));
return;
}
rawPayload = Base64.getDecoder().decode(sendRequest.payload);
} catch (IOException e) {
sendAckResponse(new ProducerAck(FailedToDeserializeFromJSON, e.getMessage(), null, null));
return;
} catch (IllegalArgumentException e) {
String msg = format("Invalid Base64 message-payload error=%s", e.getMessage());
sendAckResponse(new ProducerAck(PayloadEncodingError, msg, null, requestContext));
return;
} catch (NullPointerException e) {
// Null payload
sendAckResponse(new ProducerAck(PayloadEncodingError, e.getMessage(), null, requestContext));
return;
}

final long msgSize = rawPayload.length;
Expand Down Expand Up @@ -532,4 +525,4 @@ private void printLogIfSettingDiscardedCompressionParams() {

private static final Logger log = LoggerFactory.getLogger(ProducerHandler.class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.apache.pulsar.websocket.data.EndOfTopicResponse;
import org.eclipse.jetty.websocket.api.Session;
Expand Down Expand Up @@ -210,17 +209,6 @@ public void onWebSocketConnect(Session session) {
public void onWebSocketText(String message) {
super.onWebSocketText(message);

try {
ConsumerCommand command = consumerCommandReader.readValue(message);
if ("isEndOfTopic".equals(command.type)) {
handleEndOfTopic();
return;
}
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);
}

// We should have received an ack
// but reader doesn't send an ack to broker here because already reader did

Expand All @@ -232,7 +220,8 @@ public void onWebSocketText(String message) {
}

// Check and notify reader if reached end of topic.
private void handleEndOfTopic() {
@Override
protected void handleEndOfTopic() {
try {
String msg = objectWriter().writeValueAsString(
new EndOfTopicResponse(reader.hasReachedEndOfTopic()));
Expand Down Expand Up @@ -353,4 +342,4 @@ private MessageId getMessageId() throws IOException {

private static final Logger log = LoggerFactory.getLogger(ReaderHandler.class);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.data;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class AuthChallenge {
public Challenge challenge;
public int protocolVersion;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket.data;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class AuthData {
public String authMethodName;
public String authData;

}
Loading