From a0cf34f350936bd8dfac395f1b448719e7d29e42 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 5 Mar 2026 13:42:55 -0500 Subject: [PATCH 1/2] Scheduling sources --- build-tester/build.gradle | 206 ------------------ build-tester/settings.gradle | 8 - schedule-message/build.gradle | 3 +- .../io/synadia/examples/ScheduleBasics.java | 16 +- .../synadia/examples/ScheduleFromSource.java | 108 +++++++++ ...leExampleUtils.java => ScheduleUtils.java} | 2 +- .../synadia/sm/ScheduledMessageBuilder.java | 63 +++++- 7 files changed, 180 insertions(+), 226 deletions(-) delete mode 100644 build-tester/build.gradle delete mode 100644 build-tester/settings.gradle create mode 100644 schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java rename schedule-message/src/examples/java/io/synadia/examples/{ScheduleExampleUtils.java => ScheduleUtils.java} (97%) diff --git a/build-tester/build.gradle b/build-tester/build.gradle deleted file mode 100644 index 323a6cc..0000000 --- a/build-tester/build.gradle +++ /dev/null @@ -1,206 +0,0 @@ -import aQute.bnd.gradle.Bundle - -plugins { - id("java") - id("java-library") - id("maven-publish") - id("jacoco") - id("biz.aQute.bnd.builder") version "7.1.0" - id("org.gradle.test-retry") version "1.6.4" - id("io.github.gradle-nexus.publish-plugin") version "2.0.0" - id("signing") -} - -def jarVersion = "0.0.7" -group = 'io.synadia' - -def isRelease = System.getenv("BUILD_EVENT") == "release" - -def tc = System.getenv("TARGET_COMPATIBILITY"); -def targetCompat = tc == "21" ? JavaVersion.VERSION_21 : (tc == "17" ? JavaVersion.VERSION_17 : JavaVersion.VERSION_1_8) -def jarEnd = tc == "21" ? "-jdk21" : (tc == "17" ? "-jdk17" : "") -def jarAndArtifactName = "build-tester" + jarEnd - -version = isRelease ? jarVersion : jarVersion + "-SNAPSHOT" // version is the variable the build actually uses. - -java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = targetCompat -} - -repositories { - mavenCentral() - maven { url="https://repo1.maven.org/maven2/" } - maven { url="https://central.sonatype.com/repository/maven-snapshots" } -} - -dependencies { - implementation 'io.nats:jnats:2.25.1' - implementation 'org.jspecify:jspecify:1.0.0' - implementation 'io.synadia:counters:0.2.1-SNAPSHOT' - - testImplementation 'org.junit.jupiter:junit-jupiter:5.14.1' - testImplementation 'org.junit.platform:junit-platform-launcher:1.14.3' -} - -sourceSets { - main { - java { - srcDirs = ['src/main/java','src/examples/java'] - } - resources { - srcDirs = ['src/examples/resources'] - } - } - test { - java { - srcDirs = ['src/test/java'] - } - } -} - -tasks.register('bundle', Bundle) { - from sourceSets.main.output - exclude("io/synadia/bt/examples/**") -} - -jar { - bundle { - bnd("Bundle-Name": "io.synadia.build.tester", - "Bundle-Vendor": "synadia.io", - "Bundle-Description": "Build Tester", - "Bundle-DocURL": "https://synadia.io" - ) - } -} - -test { - // Use junit platform for unit tests - useJUnitPlatform() - testLogging { - exceptionFormat = 'full' - events "started", "passed", "skipped", "failed" - showStandardStreams = true - } - retry { - failOnPassedAfterRetry = false - maxFailures = 3 - maxRetries = 3 - } - systemProperty 'junit.jupiter.execution.timeout.default', '3m' -} - -javadoc { - options.overview = 'src/main/javadoc/overview.html' // relative to source root - source = sourceSets.main.allJava - title = "Synadia Communications Inc. Build Tester" - excludes ['**/examples/**'] - classpath = sourceSets.main.runtimeClasspath -} - -tasks.register('examplesJar', Jar) { - archiveClassifier.set('examples') - manifest { - attributes('Implementation-Title': 'Build Tester Examples', - 'Implementation-Version': jarVersion, - 'Implementation-Vendor': 'synadia.io') - } - from(sourceSets.main.output) { - include "io/synadia/bt/examples/**" - } -} - -tasks.register('javadocJar', Jar) { - archiveClassifier.set('javadoc') - from javadoc -} - -tasks.register('sourcesJar', Jar) { - archiveClassifier.set('sources') - from sourceSets.main.allSource -} - -tasks.register('testsJar', Jar) { - archiveClassifier.set('tests') - from sourceSets.test.allSource -} - -artifacts { - archives javadocJar, sourcesJar, examplesJar, testsJar -} - -jacoco { - toolVersion = "0.8.12" -} - -jacocoTestReport { - reports { - xml.required = true // coveralls plugin depends on xml format report - html.required = true - } - afterEvaluate { // only report on main library not examples - classDirectories.setFrom(files(classDirectories.files.collect { - fileTree(dir: it, - exclude: ['**/examples**','**/Debug**']) - })) - } -} - -nexusPublishing { - repositories { - sonatype { - nexusUrl.set(uri("https://ossrh-staging-api.central.sonatype.com/service/local/")) - snapshotRepositoryUrl.set(uri("https://central.sonatype.com/repository/maven-snapshots/")) - username = System.getenv('OSSRH_USERNAME') - password = System.getenv('OSSRH_PASSWORD') - } - } -} - -publishing { - publications { - mavenJava(MavenPublication) { - from components.java - artifact sourcesJar - artifact examplesJar - artifact javadocJar - artifact testsJar - pom { - name = jarAndArtifactName - packaging = "jar" - groupId = group - artifactId = jarAndArtifactName - description = "Synadia Communications Inc. Build Tester" - url = "https://github.com/synadia-io/orbit.java/tree/main/build-tester" - licenses { - license { - name = 'The Apache License, Version 2.0' - url = 'http://www.apache.org/licenses/LICENSE-2.0.txt' - } - } - developers { - developer { - id = "synadia" - name = "Synadia" - email = "info@synadia.com" - url = "https://synadia.io" - } - } - scm { - url = "https://github.com/synadia-io/orbit.java" - } - } - } - } -} - -if (isRelease) { - signing { - def signingKeyId = System.getenv('SIGNING_KEY_ID') - def signingKey = System.getenv('SIGNING_KEY') - def signingPassword = System.getenv('SIGNING_PASSWORD') - useInMemoryPgpKeys(signingKeyId, signingKey, signingPassword) - sign configurations.archives - sign publishing.publications.mavenJava - } -} diff --git a/build-tester/settings.gradle b/build-tester/settings.gradle deleted file mode 100644 index 34bf496..0000000 --- a/build-tester/settings.gradle +++ /dev/null @@ -1,8 +0,0 @@ -pluginManagement { - repositories { - mavenCentral() - maven { url "https://oss.sonatype.org/content/repositories/releases/" } - maven { url "https://plugins.gradle.org/m2/" } - } -} -rootProject.name = 'build-tester' diff --git a/schedule-message/build.gradle b/schedule-message/build.gradle index f42c512..1f1fcf0 100644 --- a/schedule-message/build.gradle +++ b/schedule-message/build.gradle @@ -29,13 +29,14 @@ java { } repositories { + mavenLocal() mavenCentral() maven { url="https://repo1.maven.org/maven2/" } maven { url="https://central.sonatype.com/repository/maven-snapshots" } } dependencies { - implementation 'io.nats:jnats:2.25.1' + implementation 'io.nats:jnats:2.25.3-SNAPSHOT' implementation 'org.jspecify:jspecify:1.0.0' implementation 'io.synadia:counters:0.2.2' diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java index fb26069..631a00b 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleBasics.java @@ -13,10 +13,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static io.synadia.examples.ScheduleExampleUtils.report; +import static io.synadia.examples.ScheduleUtils.report; public class ScheduleBasics { - public static final String STREAM = "scheduler"; + public static final String STREAM = "schedules-enabled"; public static final String SCHEDULE_PREFIX = "schedule."; public static final String TARGET_PREFIX = "target."; @@ -33,7 +33,7 @@ public static void main(String[] args) { .errorListener(new ErrorListener() {}) .build(); - try (Connection connection = Nats.connectReconnectOnConnect(options)) { + try (Connection connection = Nats.connect(options)) { JetStreamManagement jsm = connection.jetStreamManagement();; JetStream js = connection.jetStream(); @@ -49,13 +49,13 @@ public static void main(String[] args) { // subscribe to the subject that receives the schedule message js.subscribe(SCHEDULES, d, m -> { - report("SCHEDULED", m); + report("SCHEDULED (received)", m); m.ack(); }, false); // subscribe to the target subject js.subscribe(TARGETS, d, m -> { - report("RECEIVED", m); + report("TARGETED (received)", m); m.ack(); latch.countDown(); }, false); @@ -66,7 +66,7 @@ public static void main(String[] args) { .scheduleImmediate() .data("Schedule-Now") .build(); - report("PUBLISH", m); + report("SCHEDULE-NOW (sending)", m); js.publish(m); m = new ScheduledMessageBuilder() @@ -75,7 +75,7 @@ public static void main(String[] args) { .scheduleAt(DateTimeUtils.gmtNow().plusSeconds(5)) .data("Scheduled-At") .build(); - report("PUBLISH", m); + report("SCHEDULE-AT (sending)", m); js.publish(m); m = new ScheduledMessageBuilder() @@ -84,7 +84,7 @@ public static void main(String[] args) { .scheduleEvery(1, TimeUnit.SECONDS) .data("Every Second") .build(); - report("PUBLISH", m); + report("SCHEDULE-EVERY (sending)", m); js.publish(m); latch.await(); diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java new file mode 100644 index 0000000..0627381 --- /dev/null +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleFromSource.java @@ -0,0 +1,108 @@ +// Copyright (c) 2025 Synadia Communications Inc. All Rights Reserved. +// See LICENSE and NOTICE file for details. + +package io.synadia.examples; + +import io.nats.client.*; +import io.nats.client.api.StorageType; +import io.nats.client.api.StreamInfo; +import io.nats.client.impl.Headers; +import io.nats.client.impl.NatsMessage; +import io.synadia.sm.ScheduledMessageBuilder; +import io.synadia.sm.ScheduledStreamUtil; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; + +import static io.synadia.examples.ScheduleUtils.report; + +public class ScheduleFromSource { + public static final String STREAM = "schedules-enabled"; + + private static final String SCHEDULES = "schedules"; + private static final String TARGET = "target"; + private static final String SOURCE = "source"; + + public static final String[] STREAM_SUBJECTS = new String[]{SCHEDULES, TARGET, SOURCE}; + + public static void main(String[] args) { + try { + Options options = new Options.Builder() + .server("nats://localhost:4222") + .errorListener(new ErrorListener() {}) + .build(); + + try (Connection connection = Nats.connect(options)) { + JetStreamManagement jsm = connection.jetStreamManagement(); + JetStream js = connection.jetStream(); + + // delete the stream in case it existed, just for a fresh example + try { jsm.deleteStream(STREAM); } catch (Exception ignore) {} + + // Use the utility to properly create a schedulable stream + StreamInfo si = ScheduledStreamUtil.createSchedulableStream(jsm, STREAM, StorageType.Memory, STREAM_SUBJECTS); + report("Created stream", si.getConfiguration()); + + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(2); + Dispatcher d = connection.createDispatcher(); + + // subscribe to the subject that receives the schedule message + js.subscribe(SCHEDULES, d, m -> { + report("SCHEDULED (received)", m); + m.ack(); + }, false); + + // subscribe to the target subject + js.subscribe(SOURCE, d, m -> { + report("SOURCED (received)", m); + m.ack(); + }, false); + + // subscribe to the target subject + js.subscribe(TARGET, d, m -> { + report("TARGETED (received)", m); + m.ack(); + latch1.countDown(); + latch2.countDown(); + }, false); + + // Publish Data to the Source subject + String sourceData = "data1"; + Headers sourceHeaders = new Headers(); + sourceHeaders.put("foo1", "bar1"); + Message sourceMessage = new NatsMessage(SOURCE, null, sourceHeaders, sourceData.getBytes()); + report("SOURCE 1 (sending)", sourceMessage); + js.publish(sourceMessage); + connection.flush(Duration.ofSeconds(1)); + + Message scheduleMessage = new ScheduledMessageBuilder() + .scheduleSubject(SCHEDULES) + .targetSubject(TARGET) + .scheduleImmediate() + .sources(SOURCE) + .build(); + report("SCHEDULE 1 (sending)", scheduleMessage); + js.publish(scheduleMessage); + + latch1.await(); + + sourceData = "data2"; + sourceHeaders = new Headers(); + sourceHeaders.put("foo2", "bar2"); + sourceMessage = new NatsMessage(SOURCE, null, sourceHeaders, sourceData.getBytes()); + report("SOURCE 2 (sending)", sourceMessage); + js.publish(sourceMessage); + connection.flush(Duration.ofSeconds(1)); + + report("SCHEDULE 2 (sending)", scheduleMessage); + js.publish(scheduleMessage); + + latch2.await(); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/schedule-message/src/examples/java/io/synadia/examples/ScheduleExampleUtils.java b/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java similarity index 97% rename from schedule-message/src/examples/java/io/synadia/examples/ScheduleExampleUtils.java rename to schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java index 9e76501..fac2489 100644 --- a/schedule-message/src/examples/java/io/synadia/examples/ScheduleExampleUtils.java +++ b/schedule-message/src/examples/java/io/synadia/examples/ScheduleUtils.java @@ -6,7 +6,7 @@ import io.nats.client.Message; import io.nats.client.impl.Headers; -public class ScheduleExampleUtils { +public class ScheduleUtils { public static void report(Object... objects) { StringBuilder sb = new StringBuilder(); diff --git a/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java b/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java index 6401be2..4725560 100644 --- a/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java +++ b/schedule-message/src/main/java/io/synadia/sm/ScheduledMessageBuilder.java @@ -16,6 +16,9 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -25,11 +28,13 @@ public class ScheduledMessageBuilder { public static final long NANOS_PER_SECOND = 1_000_000_000L; private String scheduleString; + private String timezone; private String scheduleSubject; private String targetSubject; private Headers headers; private byte[] data; private MessageTtl messageTtl; + private final List sources = new ArrayList<>(); public ScheduledMessageBuilder() {} @@ -88,7 +93,7 @@ public ScheduledMessageBuilder data(String data, final Charset charset) { } /** - * Set the headers + * Set message headers * @param headers the headers * @return the builder */ @@ -108,7 +113,27 @@ public ScheduledMessageBuilder copy(Message message) { } /** - * Schedule for at a specific time + * Schedule for an amount of time from now. + * This is not absolute since it takes time to build and send the message. + * @param fromNow how long from now to schedule + * @return a ScheduledMessageBuilder object + */ + public ScheduledMessageBuilder scheduleIn(Duration fromNow) { + return scheduleAt(ZonedDateTime.now().plus(fromNow)); + } + + /** + * Schedule for an amount of time from now. + * This is not absolute since it takes time to build and send the message. + * @param fromNow how long from now to schedule + * @return a ScheduledMessageBuilder object + */ + public ScheduledMessageBuilder scheduleIn(long fromNow, TimeUnit timeUnit) { + return scheduleAt(ZonedDateTime.now().plusNanos(timeUnit.toNanos(fromNow))); + } + + /** + * Schedule for a specific time * @param zdt the time to schedule * @return a ScheduledMessageBuilder object */ @@ -191,11 +216,39 @@ public ScheduledMessageBuilder scheduleCron(String cron) { return this; } + /** + * Schedule based on standard cron + * @param cron A valid cron string + * @param timezone a valid IANA time zone + * @return a ScheduledMessageBuilder object + */ + public ScheduledMessageBuilder scheduleCron(String cron, String timezone) { + scheduleString = Validator.emptyAsNull(cron); + this.timezone = timezone; + return this; + } + public ScheduledMessageBuilder messageTtl(MessageTtl messageTtl) { this.messageTtl = messageTtl; return this; } + public ScheduledMessageBuilder sources(List sources) { + this.sources.clear(); + if (sources != null) { + this.sources.addAll(sources); + } + return this; + } + + public ScheduledMessageBuilder sources(String... sources) { + this.sources.clear(); + if (sources != null) { + Collections.addAll(this.sources, sources); + } + return this; + } + public Message build() { Validator.required(scheduleSubject, "Publish Subject is required."); Validator.required(targetSubject, "Target Subject is required."); @@ -215,6 +268,12 @@ public Message build() { if (messageTtl != null) { headers.put(NatsJetStreamConstants.NATS_SCHEDULE_TTL_HDR, messageTtl.getTtlString()); } + if (timezone != null) { + headers.put(NatsJetStreamConstants.NATS_SCHEDULE_TIME_ZONE_HDR, timezone); + } + if (sources.size() > 0) { + headers.put(NatsJetStreamConstants.NATS_SCHEDULE_SOURCE_HDR, sources); + } return NatsMessage.builder() .subject(scheduleSubject) From eb3ab8c4321a9c124bf183d0a4c63b659f938e1d Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 5 Mar 2026 13:51:44 -0500 Subject: [PATCH 2/2] Scheduling sources --- schedule-message/build.gradle | 40 ++++++++++++----------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/schedule-message/build.gradle b/schedule-message/build.gradle index 1f1fcf0..2a61b39 100644 --- a/schedule-message/build.gradle +++ b/schedule-message/build.gradle @@ -23,6 +23,10 @@ def jarAndArtifactName = "schedule-message" + jarEnd version = isRelease ? jarVersion : jarVersion + "-SNAPSHOT" // version is the variable the build actually uses. +System.out.println("Java: " + System.getProperty("java.version")) +System.out.println("Target Compatibility: " + targetCompat) +System.out.println(group + ":" + jarAndArtifactName + ":" + version) + java { sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = targetCompat @@ -32,7 +36,7 @@ repositories { mavenLocal() mavenCentral() maven { url="https://repo1.maven.org/maven2/" } - maven { url="https://central.sonatype.com/repository/maven-snapshots" } + maven { url="https://central.sonatype.com/repository/maven-snapshots/" } } dependencies { @@ -40,6 +44,7 @@ dependencies { implementation 'org.jspecify:jspecify:1.0.0' implementation 'io.synadia:counters:0.2.2' + testImplementation 'io.nats:jnats-server-runner:3.1.0' testImplementation 'org.junit.jupiter:junit-jupiter:5.14.1' testImplementation 'org.junit.platform:junit-platform-launcher:1.14.3' } @@ -49,9 +54,6 @@ sourceSets { java { srcDirs = ['src/main/java','src/examples/java'] } - resources { - srcDirs = ['src/examples/resources'] - } } test { java { @@ -62,7 +64,7 @@ sourceSets { tasks.register('bundle', Bundle) { from sourceSets.main.output - exclude("io/synadia/bt/examples/**") + exclude("io/synadia/examples/**") } jar { @@ -70,7 +72,8 @@ jar { bnd("Bundle-Name": "io.synadia.schedule.message", "Bundle-Vendor": "synadia.io", "Bundle-Description": "JetStream Scheduled Messages", - "Bundle-DocURL": "https://synadia.io" + "Bundle-DocURL": "https://github.com/synadia-io/orbit.java/tree/main/schedule-message", + "Target-Compatibility": "Java " + targetCompat ) } } @@ -78,17 +81,6 @@ jar { test { // Use junit platform for unit tests useJUnitPlatform() - testLogging { - exceptionFormat = 'full' - events "started", "passed", "skipped", "failed" - showStandardStreams = true - } - retry { - failOnPassedAfterRetry = false - maxFailures = 3 - maxRetries = 3 - } - systemProperty 'junit.jupiter.execution.timeout.default', '3m' } javadoc { @@ -107,7 +99,7 @@ tasks.register('examplesJar', Jar) { 'Implementation-Vendor': 'synadia.io') } from(sourceSets.main.output) { - include "io/synadia/bt/examples/**" + include "io/synadia/examples/**" } } @@ -121,13 +113,8 @@ tasks.register('sourcesJar', Jar) { from sourceSets.main.allSource } -tasks.register('testsJar', Jar) { - archiveClassifier.set('tests') - from sourceSets.test.allSource -} - artifacts { - archives javadocJar, sourcesJar, examplesJar, testsJar + archives javadocJar, sourcesJar, examplesJar } jacoco { @@ -165,10 +152,9 @@ publishing { artifact sourcesJar artifact examplesJar artifact javadocJar - artifact testsJar pom { name = jarAndArtifactName - packaging = "jar" + packaging = 'jar' groupId = group artifactId = jarAndArtifactName description = "Synadia Communications Inc. JetStream Scheduled Messages" @@ -188,7 +174,7 @@ publishing { } } scm { - url = "https://github.com/synadia-io/orbit.java" + url = 'https://github.com/synadia-io/orbit.java' } } }