diff --git a/.github/workflows/socketio_e2e.yml b/.github/workflows/socketio_e2e.yml index 73d0954b3..cb1b237d6 100644 --- a/.github/workflows/socketio_e2e.yml +++ b/.github/workflows/socketio_e2e.yml @@ -7,10 +7,6 @@ on: branches: [ "main" ] paths: - 'sdk/**' - pull_request_target: - branches: [ "main" ] - paths: - - 'sdk/**' env: NODE_VERSION: '18.x' # set this to the node version to use jobs: @@ -49,4 +45,4 @@ jobs: SocketIoPort: 3000 run: | pushd sdk/webpubsub-socketio-extension - yarn run test \ No newline at end of file + yarn run test diff --git a/tests/integration-tests/go/client_connect_tests.go b/tests/integration-tests/go/client_connect_tests.go new file mode 100644 index 000000000..2ca171a9f --- /dev/null +++ b/tests/integration-tests/go/client_connect_tests.go @@ -0,0 +1,246 @@ +package integration_tests + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/internal/recording" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v3/testutil" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/webpubsub/armwebpubsub" + "github.com/stretchr/testify/suite" + "golang.org/x/net/websocket" +) + +type ClientConnectTestsSuite struct { + suite.Suite + serviceClient *armwebpubsub.Client +} + +func (suite *ClientConnectTestsSuite) SetupSuite() { + // Setup code here + connectionString := recording.GetEnvVariable("WEB_PUBSUB_CONNECTION_STRING", "") + if connectionString == "" { + suite.T().Fatal("Please set the WEB_PUBSUB_CONNECTION_STRING environment variable.") + } + client, err := armwebpubsub.NewClient(connectionString, nil) + if err != nil { + suite.T().Fatal(err) + } + suite.serviceClient = client +} + +func (suite *ClientConnectTestsSuite) TearDownSuite() { + // Teardown code here +} + +func (suite *ClientConnectTestsSuite) TestSimpleWebSocketClientCanConnectAndReceiveMessages() { + options := &armwebpubsub.ClientOptions{} + url, err := suite.serviceClient.GetClientAccessUri(context.Background(), options) + if err != nil { + suite.T().Fatal(err) + } + + client, err := NewWebSocketClient(url, IsSimpleClientEndSignal) + if err != nil { + suite.T().Fatal(err) + } + + defer client.Stop() + + textContent := "Hello" + suite.serviceClient.SendToAll(context.Background(), textContent, armwebpubsub.ContentTypeTextPlain) + + jsonContent := map[string]string{"hello": "world"} + jsonData, _ := json.Marshal(jsonContent) + suite.serviceClient.SendToAll(context.Background(), jsonData, armwebpubsub.ContentTypeApplicationJSON) + + binaryContent := []byte("Hello") + suite.serviceClient.SendToAll(context.Background(), binaryContent, armwebpubsub.ContentTypeApplicationOctetStream) + + suite.serviceClient.SendToAll(context.Background(), GetEndSignalBytes(), armwebpubsub.ContentTypeApplicationOctetStream) + + client.WaitForConnected() + client.LifetimeTask() + + frames := client.ReceivedFrames + suite.Equal(3, len(frames)) + suite.Equal(textContent, frames[0].MessageAsString) + suite.Equal(string(jsonData), frames[1].MessageAsString) + suite.Equal(binaryContent, frames[2].MessageBytes) +} + +func (suite *ClientConnectTestsSuite) TestWebSocketClientWithInitialGroupCanConnectAndReceiveGroupMessages() { + options := &armwebpubsub.ClientOptions{} + group := "GroupA" + url, err := suite.serviceClient.GetClientAccessUri(context.Background(), options, armwebpubsub.GetClientAccessUriOptions{Groups: []string{group}}) + if err != nil { + suite.T().Fatal(err) + } + + client, err := NewWebSocketClient(url, IsSimpleClientEndSignal) + if err != nil { + suite.T().Fatal(err) + } + + defer client.Stop() + + textContent := "Hello" + suite.serviceClient.SendToGroup(context.Background(), group, textContent, armwebpubsub.ContentTypeTextPlain) + + jsonContent := map[string]string{"hello": "world"} + jsonData, _ := json.Marshal(jsonContent) + suite.serviceClient.SendToGroup(context.Background(), group, jsonData, armwebpubsub.ContentTypeApplicationJSON) + + binaryContent := []byte("Hello") + suite.serviceClient.SendToGroup(context.Background(), group, binaryContent, armwebpubsub.ContentTypeApplicationOctetStream) + + suite.serviceClient.SendToGroup(context.Background(), group, GetEndSignalBytes(), armwebpubsub.ContentTypeApplicationOctetStream) + + client.WaitForConnected() + client.LifetimeTask() + + frames := client.ReceivedFrames + suite.Equal(3, len(frames)) + suite.Equal(textContent, frames[0].MessageAsString) + suite.Equal(string(jsonData), frames[1].MessageAsString) + suite.Equal(binaryContent, frames[2].MessageBytes) +} + +func (suite *ClientConnectTestsSuite) TestSubprotocolWebSocketClientCanConnectAndReceiveMessages() { + options := &armwebpubsub.ClientOptions{} + url, err := suite.serviceClient.GetClientAccessUri(context.Background(), options) + if err != nil { + suite.T().Fatal(err) + } + + client, err := NewWebSocketClient(url, IsSubprotocolClientEndSignal, func(ws *websocket.Config) { + ws.Protocol = []string{"json.webpubsub.azure.v1"} + }) + if err != nil { + suite.T().Fatal(err) + } + + defer client.Stop() + + textContent := "Hello" + suite.serviceClient.SendToAll(context.Background(), textContent, armwebpubsub.ContentTypeTextPlain) + + jsonContent := map[string]string{"hello": "world"} + jsonData, _ := json.Marshal(jsonContent) + suite.serviceClient.SendToAll(context.Background(), jsonData, armwebpubsub.ContentTypeApplicationJSON) + + binaryContent := []byte("Hello") + suite.serviceClient.SendToAll(context.Background(), binaryContent, armwebpubsub.ContentTypeApplicationOctetStream) + + suite.serviceClient.SendToAll(context.Background(), GetEndSignalBytes(), armwebpubsub.ContentTypeApplicationOctetStream) + + client.WaitForConnected() + client.LifetimeTask() + + frames := client.ReceivedFrames + suite.Equal(4, len(frames)) + + var connected ConnectedMessage + json.Unmarshal([]byte(frames[0].MessageAsString), &connected) + suite.NotNil(connected) + suite.Equal("connected", connected.Event) + + suite.Equal(string(jsonData), frames[1].MessageAsString) + suite.Equal(string(jsonData), frames[2].MessageAsString) + suite.Equal(binaryContent, frames[3].MessageBytes) +} + +func IsSimpleClientEndSignal(frame WebSocketFrame) bool { + bytes := frame.MessageBytes + return len(bytes) == 3 && bytes[0] == 5 && bytes[1] == 1 && bytes[2] == 1 +} + +func IsSubprotocolClientEndSignal(frame WebSocketFrame) bool { + return frame.MessageAsString == `{"type":"message","from":"server","dataType":"binary","data":"BQEB"}` +} + +func GetEndSignalBytes() []byte { + return []byte{5, 1, 1} +} + +type ConnectedMessage struct { + Type string `json:"type"` + Event string `json:"event"` + UserId string `json:"userId"` + ConnectionId string `json:"connectionId"` +} + +type WebSocketFrame struct { + MessageAsString string + MessageBytes []byte + MessageType int +} + +type WebSocketClient struct { + ws *websocket.Conn + uri string + isEndSignal func(WebSocketFrame) bool + ReceivedFrames []WebSocketFrame +} + +func NewWebSocketClient(uri string, isEndSignal func(WebSocketFrame) bool, configureOptions ...func(*websocket.Config)) (*WebSocketClient, error) { + config, err := websocket.NewConfig(uri, uri) + if err != nil { + return nil, err + } + for _, option := range configureOptions { + option(config) + } + ws, err := websocket.DialConfig(config) + if err != nil { + return nil, err + } + client := &WebSocketClient{ + ws: ws, + uri: uri, + isEndSignal: isEndSignal, + } + go client.receiveLoop() + return client, nil +} + +func (client *WebSocketClient) Stop() { + client.ws.Close() +} + +func (client *WebSocketClient) WaitForConnected() { + // Implement wait for connected logic if needed +} + +func (client *WebSocketClient) LifetimeTask() { + // Implement lifetime task logic if needed +} + +func (client *WebSocketClient) receiveLoop() { + for { + var msg = make([]byte, 512) + n, err := client.ws.Read(msg) + if err != nil { + return + } + frame := WebSocketFrame{ + MessageBytes: msg[:n], + MessageAsString: string(msg[:n]), + } + if client.isEndSignal(frame) { + return + } + client.ReceivedFrames = append(client.ReceivedFrames, frame) + } +} + +func TestClientConnectTestsSuite(t *testing.T) { + suite.Run(t, new(ClientConnectTestsSuite)) +} diff --git a/tests/integration-tests/go/project_setup_test.go b/tests/integration-tests/go/project_setup_test.go new file mode 100644 index 000000000..ff0c262fa --- /dev/null +++ b/tests/integration-tests/go/project_setup_test.go @@ -0,0 +1,19 @@ +package integration_tests + +import ( + "context" + "fmt" + "testing" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/internal/recording" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v3/testutil" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/webpubsub/armwebpubsub" + "github.com/stretchr/testify/suite" +) + +// Placeholder for the actual test cases and logic +// Since the original file is a project file, it does not contain test cases or logic to translate +// You can add your test cases here diff --git a/tests/integration-tests/go/task_extensions_test.go b/tests/integration-tests/go/task_extensions_test.go new file mode 100644 index 000000000..b9c20e81b --- /dev/null +++ b/tests/integration-tests/go/task_extensions_test.go @@ -0,0 +1,40 @@ +package integration_tests + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func OrTimeout(task func() error, millisecondsDelay int) error { + timeout := time.After(time.Duration(millisecondsDelay) * time.Millisecond) + done := make(chan error, 1) + + go func() { + done <- task() + }() + + select { + case err := <-done: + return err + case <-timeout: + return errors.New("timeout") + } +} + +func TestOrTimeout(t *testing.T) { + task := func() error { + time.Sleep(1 * time.Second) + return nil + } + + err := OrTimeout(task, 500) + assert.NotNil(t, err) + assert.Equal(t, "timeout", err.Error()) + + err = OrTimeout(task, 2000) + assert.Nil(t, err) +} diff --git a/tests/integration-tests/go/test_environment_test.go b/tests/integration-tests/go/test_environment_test.go new file mode 100644 index 000000000..6df1409fb --- /dev/null +++ b/tests/integration-tests/go/test_environment_test.go @@ -0,0 +1,42 @@ +package integration_tests + +import ( + "context" + "fmt" + "os" + "testing" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/internal/recording" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal/v3/testutil" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/webpubsub/armwebpubsub" + "github.com/stretchr/testify/suite" +) + +type TestEnvironment struct { + suite.Suite + ConnectionString string +} + +func (te *TestEnvironment) SetupSuite() { + // TODO: provision the resources? + // What if multiple different resources needed? + if err := recording.LoadEnv(); err != nil { + te.T().Fatal(err) + } + + te.ConnectionString = os.Getenv("WEB_PUBSUB_CONNECTION_STRING") + if te.ConnectionString == "" { + te.T().Fatal("Please set the WEB_PUBSUB_CONNECTION_STRING environment variable.") + } +} + +func (te *TestEnvironment) TearDownSuite() { + // Clean up any resources if necessary +} + +func TestMain(m *testing.M) { + suite.Run(&testing.T{}, new(TestEnvironment)) +} diff --git a/tests/integration-tests/java/ClientConnectTests.java b/tests/integration-tests/java/ClientConnectTests.java new file mode 100644 index 000000000..f61c0a655 --- /dev/null +++ b/tests/integration-tests/java/ClientConnectTests.java @@ -0,0 +1,220 @@ +import com.azure.core.http.HttpClient; +import com.azure.core.http.HttpHeaderName; +import com.azure.core.http.policy.HttpLogDetailLevel; +import com.azure.core.http.policy.HttpLogOptions; +import com.azure.core.http.rest.RequestOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.test.TestMode; +import com.azure.core.test.TestProxyTestBase; +import com.azure.core.test.annotation.DoNotRecord; +import com.azure.core.util.BinaryData; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions; +import com.azure.messaging.webpubsub.models.WebPubSubContentType; +import com.azure.messaging.webpubsub.models.WebPubSubPermission; +import com.nimbusds.jwt.JWT; +import com.nimbusds.jwt.JWTClaimsSet; +import com.nimbusds.jwt.JWTParser; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.time.Duration; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +public class ClientConnectTests { + + @Test + public void simpleWebSocketClientCanConnectAndReceiveMessages() throws Exception { + WebPubSubServiceClientOptions options = new WebPubSubServiceClientOptions(); + WebPubSubServiceClient serviceClient = new WebPubSubServiceClient(TestEnvironment.getConnectionString(), "simpleWebSocketClientCanConnectAndReceiveMessages", options); + + String url = serviceClient.getClientAccessUri().block(); + WebSocketClient client = new WebSocketClient(url, this::isSimpleClientEndSignal); + + client.waitForConnected().get(5, TimeUnit.SECONDS); + + String textContent = "Hello"; + serviceClient.sendToAll(textContent, WebPubSubContentType.TEXT_PLAIN).block(); + BinaryData jsonContent = BinaryData.fromObject(new JsonObject().put("hello", "world")); + serviceClient.sendToAll(jsonContent, WebPubSubContentType.APPLICATION_JSON).block(); + BinaryData binaryContent = BinaryData.fromString("Hello"); + serviceClient.sendToAll(binaryContent, WebPubSubContentType.APPLICATION_OCTET_STREAM).block(); + + serviceClient.sendToAll(BinaryData.fromBytes(getEndSignalBytes()), WebPubSubContentType.APPLICATION_OCTET_STREAM).block(); + + client.lifetimeTask().get(5, TimeUnit.SECONDS); + List frames = client.getReceivedFrames(); + + Assertions.assertEquals(3, frames.size()); + Assertions.assertEquals(textContent, frames.get(0).getMessageAsString()); + Assertions.assertEquals(jsonContent.toString(), frames.get(1).getMessageAsString()); + Assertions.assertArrayEquals(binaryContent.toBytes(), frames.get(2).getMessageBytes()); + } + + @Test + public void webSocketClientWithInitialGroupCanConnectAndReceiveGroupMessages() throws Exception { + WebPubSubServiceClientOptions options = new WebPubSubServiceClientOptions(); + WebPubSubServiceClient serviceClient = new WebPubSubServiceClient(TestEnvironment.getConnectionString(), "webSocketClientWithInitialGroupCanConnectAndReceiveGroupMessages", options); + + String group = "GroupA"; + String url = serviceClient.getClientAccessUri(new GetClientAccessTokenOptions().setGroups(List.of(group))).block(); + WebSocketClient client = new WebSocketClient(url, this::isSimpleClientEndSignal); + + client.waitForConnected().get(5, TimeUnit.SECONDS); + + String textContent = "Hello"; + serviceClient.sendToGroup(group, textContent, WebPubSubContentType.TEXT_PLAIN).block(); + BinaryData jsonContent = BinaryData.fromObject(new JsonObject().put("hello", "world")); + serviceClient.sendToGroup(group, jsonContent, WebPubSubContentType.APPLICATION_JSON).block(); + BinaryData binaryContent = BinaryData.fromString("Hello"); + serviceClient.sendToGroup(group, binaryContent, WebPubSubContentType.APPLICATION_OCTET_STREAM).block(); + + serviceClient.sendToGroup(group, BinaryData.fromBytes(getEndSignalBytes()), WebPubSubContentType.APPLICATION_OCTET_STREAM).block(); + + client.lifetimeTask().get(5, TimeUnit.SECONDS); + List frames = client.getReceivedFrames(); + + Assertions.assertEquals(3, frames.size()); + Assertions.assertEquals(textContent, frames.get(0).getMessageAsString()); + Assertions.assertEquals(jsonContent.toString(), frames.get(1).getMessageAsString()); + Assertions.assertArrayEquals(binaryContent.toBytes(), frames.get(2).getMessageBytes()); + } + + @Test + public void subprotocolWebSocketClientCanConnectAndReceiveMessages() throws Exception { + WebPubSubServiceClientOptions options = new WebPubSubServiceClientOptions(); + WebPubSubServiceClient serviceClient = new WebPubSubServiceClient(TestEnvironment.getConnectionString(), "subprotocolWebSocketClientCanConnectAndReceiveMessages", options); + + String url = serviceClient.getClientAccessUri().block(); + WebSocketClient client = new WebSocketClient(url, this::isSubprotocolClientEndSignal, ws -> ws.addSubprotocol("json.webpubsub.azure.v1")); + + client.waitForConnected().get(5, TimeUnit.SECONDS); + + String textContent = "Hello"; + serviceClient.sendToAll(textContent, WebPubSubContentType.TEXT_PLAIN).block(); + JsonObject jsonContent = new JsonObject().put("hello", "world"); + serviceClient.sendToAll(BinaryData.fromObject(jsonContent), WebPubSubContentType.APPLICATION_JSON).block(); + BinaryData binaryContent = BinaryData.fromString("Hello"); + serviceClient.sendToAll(binaryContent, WebPubSubContentType.APPLICATION_OCTET_STREAM).block(); + + serviceClient.sendToAll(BinaryData.fromBytes(getEndSignalBytes()), WebPubSubContentType.APPLICATION_OCTET_STREAM).block(); + + client.lifetimeTask().get(5, TimeUnit.SECONDS); + List frames = client.getReceivedFrames(); + + Assertions.assertEquals(4, frames.size()); + ConnectedMessage connected = BinaryData.fromString(frames.get(0).getMessageAsString()).toObject(ConnectedMessage.class); + Assertions.assertNotNull(connected); + Assertions.assertEquals("connected", connected.getEvent()); + Assertions.assertEquals(new JsonObject().put("type", "message").put("from", "server").put("dataType", "text").put("data", textContent).toString(), frames.get(1).getMessageAsString()); + Assertions.assertEquals(new JsonObject().put("type", "message").put("from", "server").put("dataType", "json").put("data", jsonContent).toString(), frames.get(2).getMessageAsString()); + Assertions.assertArrayEquals(new JsonObject().put("type", "message").put("from", "server").put("dataType", "binary").put("data", binaryContent.toBytes()).toString().getBytes(StandardCharsets.UTF_8), frames.get(3).getMessageBytes()); + } + + private static class ConnectedMessage { + private String type; + private String event; + private String userId; + private String connectionId; + + public String getType() { return type; } + public void setType(String type) { this.type = type; } + public String getEvent() { return event; } + public void setEvent(String event) { this.event = event; } + public String getUserId() { return userId; } + public void setUserId(String userId) { this.userId = userId; } + public String getConnectionId() { return connectionId; } + public void setConnectionId(String connectionId) { this.connectionId = connectionId; } + } + + private boolean isSimpleClientEndSignal(WebSocketFrame frame) { + byte[] bytes = frame.getMessageBytes(); + return bytes.length == 3 && bytes[0] == 5 && bytes[1] == 1 && bytes[2] == 1; + } + + private boolean isSubprotocolClientEndSignal(WebSocketFrame frame) { + return frame.getMessageAsString().equals(new JsonObject().put("type", "message").put("from", "server").put("dataType", "binary").put("data", "BQEB").toString()); + } + + private byte[] getEndSignalBytes() { + return new byte[] { 5, 1, 1 }; + } + + private static class WebSocketFrame { + private final String messageAsString; + private final byte[] messageBytes; + + public WebSocketFrame(byte[] bytes, String type) { + if (type.equals("text")) { + this.messageBytes = bytes; + this.messageAsString = new String(bytes, StandardCharsets.UTF_8); + } else if (type.equals("binary")) { + this.messageBytes = bytes; + this.messageAsString = null; + } else { + throw new UnsupportedOperationException(type); + } + } + + public String getMessageAsString() { return messageAsString; } + public byte[] getMessageBytes() { return messageBytes; } + } + + private static class WebSocketClient implements AutoCloseable { + private final WebSocket webSocket; + private final String uri; + private final Function isEndSignal; + private final List receivedFrames = new ArrayList<>(); + private final CompletableFuture lifetimeTask = new CompletableFuture<>(); + private final CompletableFuture waitForConnected = new CompletableFuture<>(); + + public WebSocketClient(String uri, Function isEndSignal) { + this(uri, isEndSignal, null); + } + + public WebSocketClient(String uri, Function isEndSignal, Function configureOptions) { + this.uri = uri; + this.isEndSignal = isEndSignal; + this.webSocket = new WebSocket(uri); + if (configureOptions != null) configureOptions.apply(webSocket); + connect(); + receiveLoop(); + } + + private void connect() { + webSocket.connect().thenRun(() -> waitForConnected.complete(null)); + } + + private void receiveLoop() { + webSocket.onMessage((data, isLast) -> { + WebSocketFrame frame = new WebSocketFrame(data, webSocket.getSubprotocol()); + if (isEndSignal.apply(frame)) { + lifetimeTask.complete(null); + } else { + receivedFrames.add(frame); + } + }); + } + + public CompletableFuture waitForConnected() { return waitForConnected; } + public CompletableFuture lifetimeTask() { return lifetimeTask; } + public List getReceivedFrames() { return receivedFrames; } + + @Override + public void close() throws Exception { + webSocket.close(); + } + } +} diff --git a/tests/integration-tests/java/JavaProjectSetup.java b/tests/integration-tests/java/JavaProjectSetup.java new file mode 100644 index 000000000..d383a6bfb --- /dev/null +++ b/tests/integration-tests/java/JavaProjectSetup.java @@ -0,0 +1,33 @@ +import com.azure.core.http.HttpClient; +import com.azure.core.http.HttpHeaderName; +import com.azure.core.http.policy.HttpLogDetailLevel; +import com.azure.core.http.policy.HttpLogOptions; +import com.azure.core.http.rest.RequestOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.test.TestMode; +import com.azure.core.test.TestProxyTestBase; +import com.azure.core.test.annotation.DoNotRecord; +import com.azure.core.util.BinaryData; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions; +import com.azure.messaging.webpubsub.models.WebPubSubContentType; +import com.azure.messaging.webpubsub.models.WebPubSubPermission; +import com.nimbusds.jwt.JWT; +import com.nimbusds.jwt.JWTClaimsSet; +import com.nimbusds.jwt.JWTParser; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.time.Duration; + +public class JavaProjectSetup { + // Placeholder for the actual test cases and logic + // Since the original file is a project file, it does not contain test cases or logic to translate + // You can add your test cases here +} diff --git a/tests/integration-tests/java/TaskExtensions.java b/tests/integration-tests/java/TaskExtensions.java new file mode 100644 index 000000000..bbe37d4b5 --- /dev/null +++ b/tests/integration-tests/java/TaskExtensions.java @@ -0,0 +1,11 @@ +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class TaskExtensions { + public static CompletableFuture orTimeout(CompletableFuture future, int millisecondsDelay) { + CompletableFuture timeoutFuture = new CompletableFuture<>(); + CompletableFuture.delayedExecutor(millisecondsDelay, TimeUnit.MILLISECONDS).execute(() -> timeoutFuture.completeExceptionally(new TimeoutException())); + return future.applyToEither(timeoutFuture, result -> result); + } +} diff --git a/tests/integration-tests/java/TestEnvironment.java b/tests/integration-tests/java/TestEnvironment.java new file mode 100644 index 000000000..b4c67d020 --- /dev/null +++ b/tests/integration-tests/java/TestEnvironment.java @@ -0,0 +1,51 @@ +import com.azure.core.http.HttpClient; +import com.azure.core.http.HttpHeaderName; +import com.azure.core.http.policy.HttpLogDetailLevel; +import com.azure.core.http.policy.HttpLogOptions; +import com.azure.core.http.rest.RequestOptions; +import com.azure.core.http.rest.Response; +import com.azure.core.test.TestMode; +import com.azure.core.test.TestProxyTestBase; +import com.azure.core.test.annotation.DoNotRecord; +import com.azure.core.util.BinaryData; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions; +import com.azure.messaging.webpubsub.models.WebPubSubContentType; +import com.azure.messaging.webpubsub.models.WebPubSubPermission; +import com.nimbusds.jwt.JWT; +import com.nimbusds.jwt.JWTClaimsSet; +import com.nimbusds.jwt.JWTParser; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.time.Duration; +import io.github.cdimascio.dotenv.Dotenv; + +public class TestEnvironment { + public static String connectionString; + + @BeforeAll + public static void globalSetup() { + // TODO: provision the resources? + // What if multiple different resources needed? + Dotenv dotenv = Dotenv.load(); + + connectionString = dotenv.get("WEB_PUBSUB_CONNECTION_STRING"); + if (connectionString == null || connectionString.isEmpty()) { + throw new IllegalStateException("Please set the WEB_PUBSUB_CONNECTION_STRING environment variable."); + } + } + + @AfterAll + public static void globalTeardown() { + // Clean up any resources if necessary + } +} diff --git a/tests/integration-tests/javascript/clientConnectTests.js b/tests/integration-tests/javascript/clientConnectTests.js new file mode 100644 index 000000000..38ffa9afd --- /dev/null +++ b/tests/integration-tests/javascript/clientConnectTests.js @@ -0,0 +1,156 @@ +import { WebPubSubServiceClient } from '@azure/web-pubsub'; +import { isLiveMode, assertEnvironmentVariable } from '@azure-tools/test-recorder'; +import { Context } from 'mocha'; +import { assert } from 'chai'; +import ws from 'ws'; + +const { WebSocket } = ws; + +class ClientConnectTests { + async simpleWebSocketClientCanConnectAndReceiveMessages() { + const options = {}; + const serviceClient = new WebPubSubServiceClient(process.env.WEB_PUBSUB_CONNECTION_STRING, 'simpleWebSocketClientCanConnectAndReceiveMessages', options); + + const url = await serviceClient.getClientAccessUri(); + const client = new WebSocketClient(url, this.isSimpleClientEndSignal); + + await client.waitForConnected(); + + const textContent = 'Hello'; + await serviceClient.sendToAll(textContent, 'text/plain'); + const jsonContent = { hello: 'world' }; + await serviceClient.sendToAll(JSON.stringify(jsonContent), 'application/json'); + const binaryContent = Buffer.from('Hello'); + await serviceClient.sendToAll(binaryContent, 'application/octet-stream'); + + await serviceClient.sendToAll(this.getEndSignalBytes(), 'application/octet-stream'); + + await client.lifetimeTask(); + const frames = client.receivedFrames; + + assert.equal(frames.length, 3); + assert.equal(frames[0].messageAsString, textContent); + assert.equal(frames[1].messageAsString, JSON.stringify(jsonContent)); + assert.deepEqual(frames[2].messageBytes, binaryContent); + } + + async webSocketClientWithInitialGroupCanConnectAndReceiveGroupMessages() { + const options = {}; + const serviceClient = new WebPubSubServiceClient(process.env.WEB_PUBSUB_CONNECTION_STRING, 'webSocketClientWithInitialGroupCanConnectAndReceiveGroupMessages', options); + + const group = 'GroupA'; + const url = await serviceClient.getClientAccessUri({ groups: [group] }); + const client = new WebSocketClient(url, this.isSimpleClientEndSignal); + + await client.waitForConnected(); + + const textContent = 'Hello'; + await serviceClient.sendToGroup(group, textContent, 'text/plain'); + const jsonContent = { hello: 'world' }; + await serviceClient.sendToGroup(group, JSON.stringify(jsonContent), 'application/json'); + const binaryContent = Buffer.from('Hello'); + await serviceClient.sendToGroup(group, binaryContent, 'application/octet-stream'); + + await serviceClient.sendToGroup(group, this.getEndSignalBytes(), 'application/octet-stream'); + + await client.lifetimeTask(); + const frames = client.receivedFrames; + + assert.equal(frames.length, 3); + assert.equal(frames[0].messageAsString, textContent); + assert.equal(frames[1].messageAsString, JSON.stringify(jsonContent)); + assert.deepEqual(frames[2].messageBytes, binaryContent); + } + + async subprotocolWebSocketClientCanConnectAndReceiveMessages() { + const options = {}; + const serviceClient = new WebPubSubServiceClient(process.env.WEB_PUBSUB_CONNECTION_STRING, 'subprotocolWebSocketClientCanConnectAndReceiveMessages', options); + + const url = await serviceClient.getClientAccessUri(); + const client = new WebSocketClient(url, this.isSubprotocolClientEndSignal, ws => ws.protocol = 'json.webpubsub.azure.v1'); + + await client.waitForConnected(); + + const textContent = 'Hello'; + await serviceClient.sendToAll(textContent, 'text/plain'); + const jsonContent = { hello: 'world' }; + await serviceClient.sendToAll(JSON.stringify(jsonContent), 'application/json'); + const binaryContent = Buffer.from('Hello'); + await serviceClient.sendToAll(binaryContent, 'application/octet-stream'); + + await serviceClient.sendToAll(this.getEndSignalBytes(), 'application/octet-stream'); + + await client.lifetimeTask(); + const frames = client.receivedFrames; + + assert.equal(frames.length, 4); + const connected = JSON.parse(frames[0].messageAsString); + assert.isNotNull(connected); + assert.equal(connected.event, 'connected'); + assert.equal(frames[1].messageAsString, JSON.stringify({ type: 'message', from: 'server', dataType: 'text', data: textContent })); + assert.equal(frames[2].messageAsString, JSON.stringify({ type: 'message', from: 'server', dataType: 'json', data: jsonContent })); + assert.deepEqual(frames[3].messageBytes, Buffer.from(JSON.stringify({ type: 'message', from: 'server', dataType: 'binary', data: binaryContent.toString('base64') }))); + } + + isSimpleClientEndSignal(frame) { + const bytes = frame.messageBytes; + return bytes.length === 3 && bytes[0] === 5 && bytes[1] === 1 && bytes[2] === 1; + } + + isSubprotocolClientEndSignal(frame) { + return frame.messageAsString === JSON.stringify({ type: 'message', from: 'server', dataType: 'binary', data: 'BQEB' }); + } + + getEndSignalBytes() { + return Buffer.from([5, 1, 1]); + } +} + +class WebSocketClient { + constructor(uri, isEndSignal, configureOptions) { + this.uri = uri; + this.isEndSignal = isEndSignal; + this.ws = new WebSocket(uri); + if (configureOptions) configureOptions(this.ws); + this.receivedFrames = []; + this.waitForConnected = this.connect(); + this.lifetimeTask = this.receiveLoop(); + } + + async connect() { + return new Promise((resolve) => { + this.ws.on('open', resolve); + }); + } + + async receiveLoop() { + await this.waitForConnected; + return new Promise((resolve) => { + this.ws.on('message', (data) => { + const frame = new WebSocketFrame(data, this.ws.protocol); + if (this.isEndSignal(frame)) { + resolve(); + } else { + this.receivedFrames.push(frame); + } + }); + }); + } + + async send(data, options) { + this.ws.send(data, options); + } + + async close() { + this.ws.close(); + } +} + +class WebSocketFrame { + constructor(data, type) { + this.messageBytes = Buffer.from(data); + this.messageAsString = type === 'text' ? data.toString() : null; + } +} + +export default ClientConnectTests; diff --git a/tests/integration-tests/javascript/projectSetup.js b/tests/integration-tests/javascript/projectSetup.js new file mode 100644 index 000000000..2e19ef0e9 --- /dev/null +++ b/tests/integration-tests/javascript/projectSetup.js @@ -0,0 +1,9 @@ +import { WebPubSubServiceClient } from '@azure/web-pubsub'; +import { isLiveMode, assertEnvironmentVariable } from '@azure-tools/test-recorder'; +import { Context } from 'mocha'; +import { assert } from 'chai'; +import ws from 'ws'; + +// Placeholder for the actual test cases and logic +// Since the original file is a project file, it does not contain test cases or logic to translate +// You can add your test cases here diff --git a/tests/integration-tests/javascript/taskExtensions.js b/tests/integration-tests/javascript/taskExtensions.js new file mode 100644 index 000000000..06c6b4c83 --- /dev/null +++ b/tests/integration-tests/javascript/taskExtensions.js @@ -0,0 +1,4 @@ +export async function orTimeout(task, millisecondsDelay = 5000) { + const timeout = new Promise((_, reject) => setTimeout(() => reject(new Error('TimeoutException')), millisecondsDelay)); + await Promise.race([task, timeout]); +} diff --git a/tests/integration-tests/javascript/testEnvironment.js b/tests/integration-tests/javascript/testEnvironment.js new file mode 100644 index 000000000..b8b74f74c --- /dev/null +++ b/tests/integration-tests/javascript/testEnvironment.js @@ -0,0 +1,27 @@ +import { WebPubSubServiceClient } from '@azure/web-pubsub'; +import { isLiveMode, assertEnvironmentVariable } from '@azure-tools/test-recorder'; +import { Context } from 'mocha'; +import { assert } from 'chai'; +import ws from 'ws'; +import dotenv from 'dotenv'; + +class TestEnvironment { + static connectionString; + + static globalSetup() { + // TODO: provision the resources? + // What if multiple different resources needed? + dotenv.config(); + + this.connectionString = process.env.WEB_PUBSUB_CONNECTION_STRING; + if (!this.connectionString) { + throw new Error('Please set the WEB_PUBSUB_CONNECTION_STRING environment variable.'); + } + } + + static globalTeardown() { + // Clean up any resources if necessary + } +} + +export default TestEnvironment; diff --git a/tests/integration-tests/python/task_extensions.py b/tests/integration-tests/python/task_extensions.py new file mode 100644 index 000000000..e58b17dc3 --- /dev/null +++ b/tests/integration-tests/python/task_extensions.py @@ -0,0 +1,9 @@ +import asyncio + +class TaskExtensions: + @staticmethod + async def or_timeout(task, milliseconds_delay=5000): + try: + await asyncio.wait_for(task, timeout=milliseconds_delay / 1000) + except asyncio.TimeoutError: + raise TimeoutError() diff --git a/tests/integration-tests/python/test_client_connect.py b/tests/integration-tests/python/test_client_connect.py new file mode 100644 index 000000000..9ffebab38 --- /dev/null +++ b/tests/integration-tests/python/test_client_connect.py @@ -0,0 +1,133 @@ +import json +import pytest +import asyncio +from azure.core.exceptions import HttpResponseError, ServiceRequestError +from azure.messaging.webpubsubservice._operations._operations import build_send_to_all_request +# from devtools_testutils import recorded_by_proxy +from testcase import WebpubsubPowerShellPreparer, WebpubsubTest +import websockets + +class TestClientConnect(WebpubsubTest): + + @pytest.mark.asyncio + async def test_simple_websocket_client_can_connect_and_receive_messages(self): + options = {} + service_client = WebPubSubServiceClient(self.connection_string, 'test_simple_websocket_client_can_connect_and_receive_messages', options) + + url = await service_client.get_client_access_uri() + async with WebSocketClient(url, self.is_simple_client_end_signal) as client: + await client.wait_for_connected() + + text_content = 'Hello' + await service_client.send_to_all(text_content, 'text/plain') + json_content = {'hello': 'world'} + await service_client.send_to_all(json.dumps(json_content), 'application/json') + binary_content = b'Hello' + await service_client.send_to_all(binary_content, 'application/octet-stream') + + await service_client.send_to_all(self.get_end_signal_bytes(), 'application/octet-stream') + + await client.lifetime_task() + frames = client.received_frames + + assert len(frames) == 3 + assert frames[0].message_as_string == text_content + assert frames[1].message_as_string == json.dumps(json_content) + assert frames[2].message_bytes == binary_content + + @pytest.mark.asyncio + async def test_websocket_client_with_initial_group_can_connect_and_receive_group_messages(self): + options = {} + service_client = WebPubSubServiceClient(self.connection_string, 'test_websocket_client_with_initial_group_can_connect_and_receive_group_messages', options) + + group = 'GroupA' + url = await service_client.get_client_access_uri(groups=[group]) + async with WebSocketClient(url, self.is_simple_client_end_signal) as client: + await client.wait_for_connected() + + text_content = 'Hello' + await service_client.send_to_group(group, text_content, 'text/plain') + json_content = {'hello': 'world'} + await service_client.send_to_group(group, json.dumps(json_content), 'application/json') + binary_content = b'Hello' + await service_client.send_to_group(group, binary_content, 'application/octet-stream') + + await service_client.send_to_group(group, self.get_end_signal_bytes(), 'application/octet-stream') + + await client.lifetime_task() + frames = client.received_frames + + assert len(frames) == 3 + assert frames[0].message_as_string == text_content + assert frames[1].message_as_string == json.dumps(json_content) + assert frames[2].message_bytes == binary_content + + @pytest.mark.asyncio + async def test_subprotocol_websocket_client_can_connect_and_receive_messages(self): + options = {} + service_client = WebPubSubServiceClient(self.connection_string, 'test_subprotocol_websocket_client_can_connect_and_receive_messages', options) + + url = await service_client.get_client_access_uri() + async with WebSocketClient(url, self.is_subprotocol_client_end_signal, subprotocol='json.webpubsub.azure.v1') as client: + await client.wait_for_connected() + + text_content = 'Hello' + await service_client.send_to_all(text_content, 'text/plain') + json_content = {'hello': 'world'} + await service_client.send_to_all(json.dumps(json_content), 'application/json') + binary_content = b'Hello' + await service_client.send_to_all(binary_content, 'application/octet-stream') + + await service_client.send_to_all(self.get_end_signal_bytes(), 'application/octet-stream') + + await client.lifetime_task() + frames = client.received_frames + + assert len(frames) == 4 + connected = json.loads(frames[0].message_as_string) + assert connected is not None + assert connected['event'] == 'connected' + assert frames[1].message_as_string == json.dumps({'type': 'message', 'from': 'server', 'dataType': 'text', 'data': text_content}) + assert frames[2].message_as_string == json.dumps({'type': 'message', 'from': 'server', 'dataType': 'json', 'data': json_content}) + assert frames[3].message_bytes == json.dumps({'type': 'message', 'from': 'server', 'dataType': 'binary', 'data': binary_content.decode()}).encode() + + def is_simple_client_end_signal(self, frame): + bytes = frame.message_bytes + return len(bytes) == 3 and bytes[0] == 5 and bytes[1] == 1 and bytes[2] == 1 + + def is_subprotocol_client_end_signal(self, frame): + return frame.message_as_string == json.dumps({'type': 'message', 'from': 'server', 'dataType': 'binary', 'data': 'BQEB'}) + + def get_end_signal_bytes(self): + return b'\x05\x01\x01' + +class WebSocketFrame: + def __init__(self, bytes, type): + self.message_bytes = bytes + self.message_as_string = bytes.decode('utf-8') if type == 'text' else None + +class WebSocketClient: + def __init__(self, uri, is_end_signal, subprotocol=None): + self.uri = uri + self.is_end_signal = is_end_signal + self.subprotocol = subprotocol + self.received_frames = [] + self.websocket = None + + async def __aenter__(self): + self.websocket = await websockets.connect(self.uri, subprotocols=[self.subprotocol] if self.subprotocol else None) + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.websocket.close() + + async def wait_for_connected(self): + pass # Implement wait for connected logic if needed + + async def lifetime_task(self): + while True: + message = await self.websocket.recv() + frame = WebSocketFrame(message, self.websocket.subprotocol) + if self.is_end_signal(frame): + break + self.received_frames.append(frame) diff --git a/tests/integration-tests/python/test_environment.py b/tests/integration-tests/python/test_environment.py new file mode 100644 index 000000000..50adc099d --- /dev/null +++ b/tests/integration-tests/python/test_environment.py @@ -0,0 +1,18 @@ +import os +import pytest + +class TestEnvironment: + connection_string = None + + @pytest.fixture(scope='session', autouse=True) + def global_setup(self): + # TODO: provision the resources? + # What if multiple different resources needed? + self.connection_string = os.getenv('WEB_PUBSUB_CONNECTION_STRING') + if not self.connection_string: + raise ValueError('Please set the WEB_PUBSUB_CONNECTION_STRING environment variable.') + + @pytest.fixture(scope='session', autouse=True) + def global_teardown(self): + # Clean up any resources if necessary + yield \ No newline at end of file diff --git a/tests/integration-tests/python/test_project_setup.py b/tests/integration-tests/python/test_project_setup.py new file mode 100644 index 000000000..672ef84e7 --- /dev/null +++ b/tests/integration-tests/python/test_project_setup.py @@ -0,0 +1,10 @@ +import json +import pytest +from azure.core.exceptions import HttpResponseError, ServiceRequestError +from azure.messaging.webpubsubservice._operations._operations import build_send_to_all_request +# from devtools_testutils import recorded_by_proxy +from testcase import WebpubsubPowerShellPreparer, WebpubsubTest + +# Placeholder for the actual test cases and logic +# Since the original file is a project file, it does not contain test cases or logic to translate +# You can add your test cases here \ No newline at end of file