diff --git a/symphony-bdk-config/src/main/java/com/symphony/bdk/core/config/model/BdkDatafeedConfig.java b/symphony-bdk-config/src/main/java/com/symphony/bdk/core/config/model/BdkDatafeedConfig.java index 63893dfb4..46acdf76b 100644 --- a/symphony-bdk-config/src/main/java/com/symphony/bdk/core/config/model/BdkDatafeedConfig.java +++ b/symphony-bdk-config/src/main/java/com/symphony/bdk/core/config/model/BdkDatafeedConfig.java @@ -16,6 +16,11 @@ public class BdkDatafeedConfig { private String version = "v2"; private String idFilePath; private BdkRetryConfig retry = new BdkRetryConfig(BdkRetryConfig.INFINITE_MAX_ATTEMPTS); + /** + * Agent 26.2.1 or later required to enable includeInvisible. + */ + private boolean includeInvisible = false; + private String tag; public void setVersion(String version) { if ("v1".equalsIgnoreCase(version)) { diff --git a/symphony-bdk-core/build.gradle b/symphony-bdk-core/build.gradle index 759fdd14c..60ee1a945 100644 --- a/symphony-bdk-core/build.gradle +++ b/symphony-bdk-core/build.gradle @@ -76,7 +76,7 @@ dependencies { } // OpenAPI code generation -def apiBaseUrl = "https://raw.githubusercontent.com/finos/symphony-api-spec/ee09734380226ac1109a1513156ceefac3bd5a1e" +def apiBaseUrl = "https://raw.githubusercontent.com/finos/symphony-api-spec/5526c5e81cb2313c4aab18119b526534652ba98e" def generatedFolder = "$buildDir/generated/openapi" def apisToGenerate = [ Agent: 'agent/agent-api-public-deprecated.yaml', diff --git a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2.java b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2.java index 551ef4a3d..458c343b9 100644 --- a/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2.java +++ b/symphony-bdk-core/src/main/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2.java @@ -14,6 +14,7 @@ import com.symphony.bdk.http.api.ApiException; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apiguardian.api.API; import java.util.List; @@ -47,6 +48,7 @@ public class DatafeedLoopV2 extends AbstractAckIdEventLoop { * Datahose feeds are in the format *_p_*, e.g. "d25098517ec62f1fc65cd111667a8386_p_be940". */ private static final Pattern FANOUT_FEED_PATTERN = Pattern.compile("^[^\\s_]+_f(_[^\\s_]+)?$"); + private static final String FANOUT_TYPE = "fanout"; private final RetryWithRecoveryBuilder retryWithRecoveryBuilder; private final RetryWithRecovery readDatafeed; @@ -54,6 +56,8 @@ public class DatafeedLoopV2 extends AbstractAckIdEventLoop { private final RetryWithRecovery createDatafeed; private final RetryWithRecovery deleteDatafeed; + private final V5DatafeedCreateBody datafeedCreateBody; + private V5Datafeed datafeed; public DatafeedLoopV2(DatafeedApi datafeedApi, AuthSession authSession, BdkConfig config, UserV2 botInfo) { @@ -87,6 +91,14 @@ public DatafeedLoopV2(DatafeedApi datafeedApi, AuthSession authSession, BdkConfi .supplier(this::doDeleteDatafeed) .ignoreException(ApiException::isClientError) .build(); + + datafeedCreateBody = new V5DatafeedCreateBody(); + if (config.getDatafeed().isIncludeInvisible()) { + datafeedCreateBody.setIncludeInvisible(true); + } + if (StringUtils.isNotBlank(config.getDatafeed().getTag())) { + datafeedCreateBody.setTag(config.getDatafeed().getTag()); + } } @Override @@ -108,7 +120,7 @@ private V5Datafeed doCreateDatafeed() throws ApiException { return this.datafeedApi.createDatafeed( this.authSession.getSessionToken(), this.authSession.getKeyManagerToken(), - new V5DatafeedCreateBody() + datafeedCreateBody ); } @@ -118,10 +130,18 @@ private V5Datafeed doRetrieveDatafeed() throws ApiException { this.authSession.getKeyManagerToken(), null ); - return feeds.stream().filter(this::isFanoutFeed).findFirst().orElse(null); + return feeds.stream().filter(this::isMatchingFeed).findFirst().orElse(null); } - private boolean isFanoutFeed(V5Datafeed d) { + private boolean isMatchingFeed(V5Datafeed d) { + if (this.datafeedCreateBody.getTag() != null) { + if (this.datafeedCreateBody.getTag().equals(d.getId()) + && FANOUT_TYPE.equalsIgnoreCase(d.getType())) { + return true; + } else { + return false; + } + } final String datafeedId = d.getId(); return datafeedId != null && FANOUT_FEED_PATTERN.matcher(datafeedId).matches(); } diff --git a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2Test.java b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2Test.java index ca81de52e..458e9ab84 100644 --- a/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2Test.java +++ b/symphony-bdk-core/src/test/java/com/symphony/bdk/core/service/datafeed/impl/DatafeedLoopV2Test.java @@ -127,7 +127,7 @@ void testStartWithoutTag() throws ApiException, AuthUnauthorizedException { void testStartInvalidExistingFeeds(String invalidExistingFeedId) throws ApiException, AuthUnauthorizedException { when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)) .thenReturn(Collections.singletonList(new V5Datafeed().id(invalidExistingFeedId))); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenReturn( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenReturn( new V5Datafeed().id(DATAFEED_ID)); AckId ackId = new AckId().ackId(datafeedService.getAckId()); @@ -138,7 +138,7 @@ void testStartInvalidExistingFeeds(String invalidExistingFeedId) throws ApiExcep this.datafeedService.start(); - V5DatafeedCreateBody datafeedCreateBody = new V5DatafeedCreateBody(); + V5DatafeedCreateBody datafeedCreateBody = new V5DatafeedCreateBody().includeInvisible(true); verify(datafeedApi, times(1)).listDatafeed(TOKEN, TOKEN, null); verify(datafeedApi, times(1)).createDatafeed(TOKEN, TOKEN, datafeedCreateBody); verify(datafeedApi, times(1)).readDatafeed(DATAFEED_ID, TOKEN, TOKEN, ackId); @@ -149,7 +149,7 @@ void testStartInvalidExistingFeeds(String invalidExistingFeedId) throws ApiExcep void testStartValidExistingFeeds(String validExistingFeedId) throws ApiException, AuthUnauthorizedException { when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)) .thenReturn(Collections.singletonList(new V5Datafeed().id(validExistingFeedId))); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenReturn( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenReturn( new V5Datafeed().id(validExistingFeedId)); AckId ackId = new AckId().ackId(datafeedService.getAckId()); @@ -160,7 +160,7 @@ void testStartValidExistingFeeds(String validExistingFeedId) throws ApiException this.datafeedService.start(); - V5DatafeedCreateBody datafeedCreateBody = new V5DatafeedCreateBody(); + V5DatafeedCreateBody datafeedCreateBody = new V5DatafeedCreateBody().includeInvisible(true); verify(datafeedApi, times(1)).listDatafeed(TOKEN, TOKEN, null); verify(datafeedApi, times(0)).createDatafeed(TOKEN, TOKEN, datafeedCreateBody); verify(datafeedApi, times(1)).readDatafeed(validExistingFeedId, TOKEN, TOKEN, ackId); @@ -337,7 +337,7 @@ private ArgumentMatcher eqAckId(String ackId) { @Test void testStartEmptyListDatafeed() throws ApiException, AuthUnauthorizedException { when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)).thenReturn(Collections.emptyList()); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenReturn( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenReturn( new V5Datafeed().id(DATAFEED_ID)); AckId initialAckId = new AckId().ackId(""); final String secondAckId = "ack-id"; @@ -349,11 +349,11 @@ void testStartEmptyListDatafeed() throws ApiException, AuthUnauthorizedException this.datafeedService.start(); verify(datafeedApi, times(1)).listDatafeed(TOKEN, TOKEN, null); - verify(datafeedApi, times(1)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody()); + verify(datafeedApi, times(1)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true)); verify(datafeedApi, times(1)).readDatafeed(DATAFEED_ID, TOKEN, TOKEN, initialAckId); assertEquals(secondAckId, datafeedService.getAckId()); } - + @Test void testClientErrorTriggersDatafeedRecreation() throws ApiException, AuthUnauthorizedException { when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)).thenReturn( @@ -370,7 +370,7 @@ void testClientErrorTriggersDatafeedRecreation() throws ApiException, AuthUnauth .thenThrow(new ApiException(400, "")); when(datafeedApi.deleteDatafeed(DATAFEED_ID, TOKEN, TOKEN)).thenReturn(null); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenReturn( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenReturn( new V5Datafeed().id(secondDatafeedId)); when(datafeedApi.readDatafeed(secondDatafeedId, TOKEN, TOKEN, initialAckId)) .thenReturn(new V5EventList().addEventsItem( @@ -470,35 +470,35 @@ void testStartErrorListDatafeedThenRetrySuccess() throws ApiException, AuthUnaut @Test void testStartAuthErrorCreateDatafeed() throws ApiException, AuthUnauthorizedException { when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)).thenReturn(Collections.emptyList()); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenThrow( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenThrow( new ApiException(401, "unauthorized-error")); doThrow(AuthUnauthorizedException.class).when(authSession).refresh(); assertThrows(AuthUnauthorizedException.class, this.datafeedService::start); verify(datafeedApi, times(1)).listDatafeed(TOKEN, TOKEN, null); - verify(datafeedApi, times(1)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody()); + verify(datafeedApi, times(1)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true)); } @Test void testStartClientErrorCreateDatafeed() throws ApiException { when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)).thenReturn(Collections.emptyList()); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenThrow( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenThrow( new ApiException(400, "client-error")); assertThrows(ApiException.class, this.datafeedService::start); verify(datafeedApi, times(1)).listDatafeed(TOKEN, TOKEN, null); - verify(datafeedApi, times(2)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody()); + verify(datafeedApi, times(2)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true)); } @Test void testStartServerErrorCreateDatafeed() throws ApiException { when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)).thenReturn(Collections.emptyList()); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenThrow( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenThrow( new ApiException(502, "server-error")); assertThrows(ApiException.class, this.datafeedService::start); verify(datafeedApi, times(1)).listDatafeed(TOKEN, TOKEN, null); - verify(datafeedApi, times(2)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody()); + verify(datafeedApi, times(2)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true)); } @Test @@ -506,7 +506,7 @@ void testStartClientErrorReadDatafeedAndClientErrorCreateDatafeed() throws ApiEx AckId ackId = new AckId().ackId(datafeedService.getAckId()); when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)).thenReturn( Collections.singletonList(new V5Datafeed().id(DATAFEED_ID))); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())) + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))) .thenThrow(new ApiException(400, "ALB: No matching rule found")) .thenReturn(new V5Datafeed().id("recreate-df-id")); when(datafeedApi.readDatafeed(DATAFEED_ID, TOKEN, TOKEN, ackId)).thenThrow(new ApiException(400, "client-error")); @@ -520,7 +520,7 @@ void testStartClientErrorReadDatafeedAndClientErrorCreateDatafeed() throws ApiEx verify(datafeedApi, times(1)).readDatafeed(DATAFEED_ID, TOKEN, TOKEN, ackId); verify(datafeedApi, times(1)).readDatafeed("recreate-df-id", TOKEN, TOKEN, ackId); verify(datafeedApi, times(1)).deleteDatafeed(DATAFEED_ID, TOKEN, TOKEN); - verify(datafeedApi, times(2)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody()); + verify(datafeedApi, times(2)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true)); } @Test @@ -613,7 +613,7 @@ void testStartClientErrorDeleteDatafeed() throws ApiException, AuthUnauthorizedE AckId ackId = new AckId().ackId(datafeedService.getAckId()); when(datafeedApi.listDatafeed(TOKEN, TOKEN, null)).thenReturn( Collections.singletonList(new V5Datafeed().id(DATAFEED_ID))); - when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody())).thenReturn( + when(datafeedApi.createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true))).thenReturn( new V5Datafeed().id("recreate-df-id")); when(datafeedApi.readDatafeed(DATAFEED_ID, TOKEN, TOKEN, ackId)).thenThrow(new ApiException(400, "client-error")); when(datafeedApi.readDatafeed("recreate-df-id", TOKEN, TOKEN, ackId)) @@ -626,7 +626,7 @@ void testStartClientErrorDeleteDatafeed() throws ApiException, AuthUnauthorizedE verify(datafeedApi, times(1)).listDatafeed(TOKEN, TOKEN, null); verify(datafeedApi, times(1)).readDatafeed(DATAFEED_ID, TOKEN, TOKEN, ackId); verify(datafeedApi, times(1)).readDatafeed("recreate-df-id", TOKEN, TOKEN, ackId); - verify(datafeedApi, times(1)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody()); + verify(datafeedApi, times(1)).createDatafeed(TOKEN, TOKEN, new V5DatafeedCreateBody().includeInvisible(true)); verify(datafeedApi, times(1)).deleteDatafeed(DATAFEED_ID, TOKEN, TOKEN); } diff --git a/symphony-bdk-core/src/test/resources/config/config.yaml b/symphony-bdk-core/src/test/resources/config/config.yaml index 63241c37e..7b3efbd98 100644 --- a/symphony-bdk-core/src/test/resources/config/config.yaml +++ b/symphony-bdk-core/src/test/resources/config/config.yaml @@ -20,6 +20,7 @@ app: path: /Users/local/conf/agent/privatekey.pem datafeed: + includeInvisible: true retry: maxAttempts: 2