From d38aa87d4d9a4812fc8cfa570cdbf780adc16cfc Mon Sep 17 00:00:00 2001 From: "guoqing.ge" Date: Mon, 23 Feb 2026 22:53:21 -0700 Subject: [PATCH] Enable variable output intervals via the `output_timelevels` stream attribute Introduce a new `output_timelevels` attribute for MPAS streams that enables variable output intervals. With this capability, we may outoput every 15 minutes in the first hour, every hour in the first 3 days, every 3 hours for the next 4 days, and every 6 hours in the last 3 days. We can also use this to only write out forecast files during a given period, such as: output_timelevels="6-12h" Check the PR description for details on how to specify the time levles. Here is a quick example: output_timelevels="0-3-15m 4-72 75-168-3 174-240-6" --- src/framework/mpas_stream_list_types.inc | 1 + src/framework/mpas_stream_manager.F | 779 +++++++++++++++++++- src/framework/mpas_stream_manager_types.inc | 3 +- src/framework/xml_stream_parser.c | 77 +- 4 files changed, 827 insertions(+), 33 deletions(-) diff --git a/src/framework/mpas_stream_list_types.inc b/src/framework/mpas_stream_list_types.inc index 30d88faa44..f1214a3f4d 100644 --- a/src/framework/mpas_stream_list_types.inc +++ b/src/framework/mpas_stream_list_types.inc @@ -17,6 +17,7 @@ character(len=StrKIND) :: filename character(len=StrKIND) :: filename_template character(len=StrKIND) :: filename_interval + character(len=StrKIND) :: output_timelevels = '' type (MPAS_Stream_type), pointer :: stream => null() integer :: timeLevel = 0 integer :: nRecords diff --git a/src/framework/mpas_stream_manager.F b/src/framework/mpas_stream_manager.F index d21dd04e4f..bbb052bb99 100644 --- a/src/framework/mpas_stream_manager.F +++ b/src/framework/mpas_stream_manager.F @@ -45,7 +45,9 @@ module mpas_stream_manager MPAS_stream_mgr_stream_exists, & MPAS_stream_mgr_get_stream_interval, & MPAS_get_stream_filename, & - MPAS_build_stream_filename + MPAS_build_stream_filename, & + get_output_interval_from_timelevels, & + get_next_timelevel_start !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ! @@ -1458,6 +1460,7 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ type (MPAS_stream_list_type), pointer :: stream type (MPAS_stream_list_type), pointer :: alarm_cursor + type (MPAS_stream_list_type), pointer :: stream_cursor integer :: local_direction integer :: local_ierr, threadNum logical :: resetAlarms @@ -1507,6 +1510,8 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ do while (associated(alarm_cursor)) if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + ! Update variable output alarms with new interval based on forecast hour + call update_variable_output_alarm(manager, stream, alarm_cursor % name, ierr=local_ierr) end if alarm_cursor => alarm_cursor % next end do @@ -1536,6 +1541,12 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{ do while (associated(alarm_cursor)) if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + ! Update variable output alarms for each stream associated with this alarm + stream_cursor => alarm_cursor % streamList % head + do while (associated(stream_cursor)) + call update_variable_output_alarm(manager, stream_cursor, alarm_cursor % name, ierr=local_ierr) + stream_cursor => stream_cursor % next + end do end if alarm_cursor => alarm_cursor % next end do @@ -1857,6 +1868,9 @@ subroutine MPAS_stream_mgr_set_property_char(manager, streamID, propertyName, pr case (MPAS_STREAM_PROPERTY_FILENAME_INTV) stream_cursor % filename_interval = propertyValue + case (MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS) + stream_cursor % output_timelevels = propertyValue + case (MPAS_STREAM_PROPERTY_REF_TIME) call mpas_set_time(stream_cursor % referenceTime, dateTimeString=propertyValue) @@ -2084,6 +2098,9 @@ subroutine MPAS_stream_mgr_get_property_char(manager, streamID, propertyName, pr case (MPAS_STREAM_PROPERTY_FILENAME_INTV) propertyValue = stream_cursor % filename_interval + case (MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS) + propertyValue = stream_cursor % output_timelevels + case (MPAS_STREAM_PROPERTY_REF_TIME) call mpas_get_time(stream_cursor % referenceTime, dateTimeString=propertyValue) @@ -3090,16 +3107,19 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite integer, intent(out) :: ierr type (MPAS_stream_list_type), pointer :: alarm_cursor - type (MPAS_Time_type) :: ref_time - type (MPAS_TimeInterval_type) :: temp_interval + type (MPAS_Time_type) :: ref_time, start_time, current_time, alarmTime_local + type (MPAS_TimeInterval_type) :: temp_interval, time_diff, alarmInterval_local type (MPAS_TimeInterval_type) :: filename_interval character (len=StrKIND) :: now_string, time_string character (len=StrKIND) :: temp_filename, actualWhen - character (len=StrKIND) :: err_string + character (len=StrKIND) :: err_string, timelevels_str logical :: ringing_alarm, recordSeek, swapRecords logical :: clobberRecords, clobberFiles, truncateFiles integer :: maxRecords, tempRecord integer :: local_ierr, threadNum + integer (kind=I8KIND) :: seconds_diff + real (kind=RKIND) :: forecast_hour, next_start_hour + real (kind=RKIND) :: interval_minutes, next_interval_minutes threadNum = mpas_threading_get_thread_num() @@ -3141,6 +3161,52 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite return end if + ! + ! If stream uses output_timelevels, check if current forecast hour is within defined range + ! This prevents output when the alarm rings but we're past the timelevels range + ! + timelevels_str = '' + call MPAS_stream_mgr_get_property(manager, trim(stream % name), & + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, & + timelevels_str, ierr=local_ierr) + if (len_trim(timelevels_str) > 0) then + ! Calculate current forecast hour + start_time = mpas_get_clock_time(manager % streamClock, MPAS_START_TIME, ierr=local_ierr) + current_time = mpas_get_clock_time(manager % streamClock, MPAS_NOW, ierr=local_ierr) + time_diff = current_time - start_time + call mpas_get_timeInterval(time_diff, S_i8=seconds_diff, ierr=local_ierr) + forecast_hour = real(seconds_diff, RKIND) / 3600.0_RKIND + + ! Check if forecast hour is within any defined timelevel range + call get_output_interval_from_timelevels(timelevels_str, forecast_hour, interval_minutes, local_ierr) + if (local_ierr /= MPAS_STREAM_MGR_NOERR) then + ! Outside defined timelevels range - check if there's a future range to schedule + call get_next_timelevel_start(timelevels_str, forecast_hour, next_start_hour, next_interval_minutes, local_ierr) + + alarm_cursor => stream % alarmList_out % head + do while (associated(alarm_cursor)) + if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then + call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + call mpas_remove_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr) + + ! If there's a future range, schedule alarm for when it starts + if (next_start_hour > 0.0_RKIND) then + STREAM_DEBUG_WRITE('-- Stream '//trim(stream % name)//' rescheduling alarm for next timelevel range.') + call mpas_set_timeInterval(time_diff, dt=next_start_hour * 3600.0_RKIND, ierr=local_ierr) + alarmTime_local = start_time + time_diff + call mpas_set_timeInterval(alarmInterval_local, dt=next_interval_minutes * 60.0_RKIND, ierr=local_ierr) + call mpas_add_clock_alarm(manager % streamClock, alarm_cursor % name, alarmTime_local, & + alarmTimeInterval=alarmInterval_local, ierr=local_ierr) + else + STREAM_DEBUG_WRITE('-- Stream '//trim(stream % name)//' output complete: past all output_timelevels ranges.') + end if + end if + alarm_cursor => alarm_cursor % next + end do + return + end if + end if + ! ! Work out file clobbering options ! @@ -3169,7 +3235,11 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite ! stream, in which case we create the stream from scratch ! if (.not. stream % valid) then - if ( stream % filename_interval /= 'none' ) then + if ( stream % filename_interval == 'output' ) then + ! Use actual write time for filename (each output gets unique file) + call mpas_get_time(writeTime, dateTimeString=time_string) + call mpas_expand_string(time_string, blockID, stream % filename_template, stream % filename) + else if ( stream % filename_interval /= 'none' ) then call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) call mpas_build_stream_filename(stream % referenceTime, writeTime, filename_interval, stream % filename_template, blockID, stream % filename, ierr=local_ierr) else @@ -3244,7 +3314,11 @@ subroutine write_stream(manager, stream, blockID, timeLevel, mgLevel, forceWrite stream % valid = .true. else - if ( stream % filename_interval /= 'none' ) then + if ( stream % filename_interval == 'output' ) then + ! Use actual write time for filename (each output gets unique file) + call mpas_get_time(writeTime, dateTimeString=time_string) + call mpas_expand_string(time_string, blockID, stream % filename_template, temp_filename) + else if ( stream % filename_interval /= 'none' ) then call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) call mpas_build_stream_filename(stream % referenceTime, writeTime, filename_interval, stream % filename_template, blockID, temp_filename, ierr=local_ierr) else @@ -3675,7 +3749,10 @@ subroutine read_stream(manager, stream, timeLevel, mgLevel, forceReadNow, when, ! ! First we need to build the filename for the current read time. ! - if ( stream % filename_interval /= 'none' ) then + if ( stream % filename_interval == 'output' ) then + ! Use actual read time for filename + call mpas_expand_string(when, blockID_local, stream % filename_template, temp_filename) + else if ( stream % filename_interval /= 'none' ) then call mpas_set_time(now_time, dateTimeString=when, ierr=local_ierr) call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) @@ -3775,7 +3852,9 @@ subroutine read_stream(manager, stream, timeLevel, mgLevel, forceReadNow, when, end if retestFile = .false. - if ( trim(stream % filename_interval) /= 'none' .and. whence /= MPAS_STREAM_EXACT_TIME ) then + if ( trim(stream % filename_interval) /= 'none' .and. & + trim(stream % filename_interval) /= 'output' .and. & + whence /= MPAS_STREAM_EXACT_TIME ) then currentTime = mpas_get_clock_time(manager % streamClock, MPAS_NOW, ierr=local_ierr) call mpas_set_timeInterval(filenameInterval, timeString=stream % filename_interval, ierr=local_ierr) @@ -3847,9 +3926,13 @@ subroutine read_stream(manager, stream, timeLevel, mgLevel, forceReadNow, when, STREAM_DEBUG_WRITE(' --- Retesting file... ') call mpas_get_time(filenameTime, dateTimeString=test_when) - call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) - - call mpas_build_stream_filename(stream % referenceTime, filenameTime, filename_interval, stream % filename_template, blockID_local, test_filename, ierr=local_ierr) + if ( stream % filename_interval == 'output' ) then + ! Use actual time for filename + call mpas_expand_string(test_when, blockID_local, stream % filename_template, test_filename) + else + call mpas_set_timeInterval(filename_interval, timeString=stream % filename_interval) + call mpas_build_stream_filename(stream % referenceTime, filenameTime, filename_interval, stream % filename_template, blockID_local, test_filename, ierr=local_ierr) + end if STREAM_DEBUG_WRITE(' --- Retesting filename is ' // trim(test_filename)) @@ -4080,7 +4163,11 @@ subroutine mpas_get_stream_filename(manager, streamID, when, blockID, filename, ! ! First we need to build the filename for the current read time. ! - if ( streamCursor % filename_interval /= 'none' ) then + if ( streamCursor % filename_interval == 'output' ) then + ! Use actual time for filename + call mpas_get_time(now_time, dateTimeString=when_string, ierr=err_local) + call mpas_expand_string(when_string, blockID_local, streamCursor % filename_template, filename) + else if ( streamCursor % filename_interval /= 'none' ) then call mpas_set_timeInterval(filename_interval, timeString=streamCursor % filename_interval) call mpas_build_stream_filename(streamCursor % referenceTime, now_time, filename_interval, streamCursor % filename_template, blockID_local, filename, ierr=err_local) else @@ -4303,7 +4390,7 @@ end subroutine gen_random ! ! Write output_interval to stream ! - IF (stream %filename_interval(1:4) /= "none") THEN + IF (stream %filename_interval(1:4) /= "none" .and. stream %filename_interval /= "output") THEN call mpas_set_timeInterval(filename_interval, timeString=stream %filename_interval) call mpas_get_timeInterval(filename_interval,M=output_interval) call mpas_writeStreamAtt(stream%stream, 'output_interval',output_interval,syncVal=.true., ierr=local_ierr) @@ -5922,6 +6009,531 @@ logical function MPAS_stream_mgr_stream_exists(manager, streamID) result(validSt end function MPAS_stream_mgr_stream_exists!}}} + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! parse_time_string + ! + !> \brief Parse a time string into total minutes ( allow 0.5 minutes, i.e. 30 seconds) + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> Parses time strings like "1h30m", "45m", "6", "90s", "1d" into total minutes + !> Supported units: d/D (days), h (hours), m (minutes), s (seconds) + !> Plain integers without units are interpreted as hours + ! + !----------------------------------------------------------------------- + subroutine parse_time_string(time_str, total_minutes, ierr)!{{{ + + implicit none + + character(len=*), intent(in) :: time_str + real(kind=RKIND), intent(out) :: total_minutes + integer, intent(out) :: ierr + + character(len=StrKIND) :: str_local + integer :: i, len_str, num_start, read_err + real(kind=RKIND) :: value + character :: ch + logical :: found_unit + + ierr = 0 + total_minutes = 0.0_RKIND + str_local = trim(adjustl(time_str)) + len_str = len_trim(str_local) + + if (len_str == 0) then + ierr = 1 + return + end if + + ! Check if last character is a digit (plain integer = hours) + ch = str_local(len_str:len_str) + if (ch >= '0' .and. ch <= '9') then + read(str_local, *, iostat=read_err) value + if (read_err /= 0) then + ierr = 1 + return + end if + total_minutes = value * 60.0_RKIND + return + end if + + ! Parse duration with units (e.g., "1h30m", "45m", "90s", "1d" or "1D") + num_start = 1 + i = 1 + do while (i <= len_str) + ch = str_local(i:i) + ! Check for unit characters: d/D (days), h (hours), m (minutes), s (seconds) + if (ch == 'h' .or. ch == 'm' .or. ch == 's' .or. ch == 'd' .or. ch == 'D') then + + if (i == num_start) then + ierr = 1 ! No number before unit + return + end if + + read(str_local(num_start:i-1), *, iostat=read_err) value + if (read_err /= 0) then + ierr = 1 + return + end if + + select case (ch) + case ('h') + total_minutes = total_minutes + value * 60.0_RKIND + case ('m') + total_minutes = total_minutes + value + case ('s') + total_minutes = total_minutes + value / 60.0_RKIND + case ('d', 'D') + total_minutes = total_minutes + value * 24.0_RKIND * 60.0_RKIND + end select + + num_start = i + 1 + end if + i = i + 1 + end do + + ! Check for trailing digits without unit (error) + if (num_start <= len_str) then + ierr = 1 + return + end if + + end subroutine parse_time_string!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! parse_output_timelevel_spec + ! + !> \brief Parse one time level specification segment + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> Parses a single segment using format: start, start-stop, or start-stop-step + !> Time strings can be integers (hours) or duration format (e.g., "1h30m", "45m") + !> Examples: "6", "0-3", "0-1h-15m", "1h30m-2h-15m" + ! + !----------------------------------------------------------------------- + subroutine parse_output_timelevel_spec(spec, start_hour, end_hour, interval_minutes, ierr)!{{{ + + implicit none + + character(len=*), intent(in) :: spec + real(kind=RKIND), intent(out) :: start_hour, end_hour + real(kind=RKIND), intent(out) :: interval_minutes + integer, intent(out) :: ierr + + character(len=StrKIND) :: spec_local + integer :: i, dash_count, dash_pos(2), len_spec + real(kind=RKIND) :: start_minutes, end_minutes, step_minutes + integer :: local_ierr + + ierr = 0 + dash_pos(1) = 0 + dash_pos(2) = 0 + spec_local = trim(adjustl(spec)) + len_spec = len_trim(spec_local) + + ! Guard against empty string + if (len_spec == 0) then + ierr = 1 + return + end if + + ! Count dashes to determine format + dash_count = 0 + do i = 1, len_spec + if (spec_local(i:i) == '-') then + dash_count = dash_count + 1 + if (dash_count <= 2) dash_pos(dash_count) = i + end if + end do + + ! Parse based on number of dashes + if (dash_count == 0) then + ! Format: single time string (output at that time only) + call parse_time_string(spec_local, start_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + return + end if + start_hour = start_minutes / 60.0_RKIND + end_hour = start_hour + interval_minutes = 60.0_RKIND ! Default, but won't matter for single time + + else if (dash_count == 1) then + ! Format: start-stop (interval defaults to 1 hour) + if (dash_pos(1) <= 1 .or. dash_pos(1) >= len_spec) then + ierr = 1 + return + end if + + call parse_time_string(spec_local(1:dash_pos(1)-1), start_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + return + end if + + call parse_time_string(spec_local(dash_pos(1)+1:len_spec), end_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + return + end if + + start_hour = start_minutes / 60.0_RKIND + end_hour = end_minutes / 60.0_RKIND + interval_minutes = 60.0_RKIND + + else if (dash_count == 2) then + ! Format: start-stop-step + if (dash_pos(1) <= 1 .or. dash_pos(2) <= dash_pos(1) + 1 .or. dash_pos(2) >= len_spec) then + ierr = 1 + return + end if + + call parse_time_string(spec_local(1:dash_pos(1)-1), start_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + return + end if + + call parse_time_string(spec_local(dash_pos(1)+1:dash_pos(2)-1), end_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + return + end if + + call parse_time_string(spec_local(dash_pos(2)+1:len_spec), step_minutes, local_ierr) + if (local_ierr /= 0) then + ierr = 1 + return + end if + + start_hour = start_minutes / 60.0_RKIND + end_hour = end_minutes / 60.0_RKIND + interval_minutes = step_minutes + + else + ! More than 2 dashes - invalid format + ierr = 1 + return + end if + + end subroutine parse_output_timelevel_spec!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! get_output_interval_from_timelevels + ! + !> \brief Calculate output interval based on current forecast hour and timelevels + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> Given a time levels string and current forecast hour, determines the appropriate + !> output interval in minutes + ! + !----------------------------------------------------------------------- + subroutine get_output_interval_from_timelevels(timelevels_str, forecast_hour, interval_minutes, ierr)!{{{ + + implicit none + + character(len=*), intent(in) :: timelevels_str + real(kind=RKIND), intent(in) :: forecast_hour + real(kind=RKIND), intent(out) :: interval_minutes + integer, intent(out) :: ierr + + character(len=StrKIND) :: work_str, segment + integer :: seg_start, seg_end, i, local_ierr, work_len + real(kind=RKIND) :: start_hour, end_hour + real(kind=RKIND) :: seg_interval_minutes + real(kind=RKIND) :: next_time, min_next_time + logical :: found, is_single_time + + ierr = 0 + interval_minutes = 60.0_RKIND ! Default to 1 hour + found = .false. + is_single_time = .false. + min_next_time = huge(1.0_RKIND) + work_str = trim(adjustl(timelevels_str)) + work_len = len_trim(work_str) + + ! Guard against empty string + if (work_len == 0) then + ierr = 1 + return + end if + + ! First pass: find if current forecast hour matches any segment + seg_start = 1 + do while (seg_start <= work_len) + ! Find end of current segment (next space or end of string) + seg_end = seg_start + do while (seg_end <= work_len .and. work_str(seg_end:seg_end) /= ' ') + seg_end = seg_end + 1 + end do + seg_end = seg_end - 1 + + if (seg_end >= seg_start) then + segment = work_str(seg_start:seg_end) + call parse_output_timelevel_spec(segment, start_hour, end_hour, seg_interval_minutes, local_ierr) + + if (local_ierr == 0) then + ! Check if current forecast hour falls in this range + if (forecast_hour >= start_hour .and. forecast_hour <= end_hour) then + found = .true. + ! Check if this is a single time point (not a range) + if (abs(start_hour - end_hour) < 1.0e-6_RKIND) then + is_single_time = .true. + else + ! It's a range - use the segment's interval + interval_minutes = seg_interval_minutes + return + end if + end if + else + ierr = 1 + return + end if + end if + + ! Move to next segment + seg_start = seg_end + 1 + do while (seg_start <= work_len .and. work_str(seg_start:seg_start) == ' ') + seg_start = seg_start + 1 + end do + end do + + ! If we matched a single time, find the next output time + if (found .and. is_single_time) then + ! Second pass: find the minimum time > forecast_hour + seg_start = 1 + do while (seg_start <= work_len) + seg_end = seg_start + do while (seg_end <= work_len .and. work_str(seg_end:seg_end) /= ' ') + seg_end = seg_end + 1 + end do + seg_end = seg_end - 1 + + if (seg_end >= seg_start) then + segment = work_str(seg_start:seg_end) + call parse_output_timelevel_spec(segment, start_hour, end_hour, seg_interval_minutes, local_ierr) + + if (local_ierr == 0) then + ! For single times, check if > current + if (abs(start_hour - end_hour) < 1.0e-6_RKIND) then + if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < min_next_time) then + min_next_time = start_hour + end if + else + ! For ranges, check if start is after current + if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < min_next_time) then + min_next_time = start_hour + end if + end if + end if + end if + + seg_start = seg_end + 1 + do while (seg_start <= work_len .and. work_str(seg_start:seg_start) == ' ') + seg_start = seg_start + 1 + end do + end do + + ! Calculate interval to next time + if (min_next_time < huge(1.0_RKIND)) then + interval_minutes = (min_next_time - forecast_hour) * 60.0_RKIND + else + ! No more times after this - but we DID match, so ierr stays 0 + ! interval_minutes = 0 signals this is the last output + interval_minutes = 0.0_RKIND + end if + return + end if + + ! If no matching range found, signal that no more output is needed + if (.not. found) then + interval_minutes = 0.0_RKIND + ierr = MPAS_STREAM_MGR_ERROR ! Signal no more output + end if + + end subroutine get_output_interval_from_timelevels!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! get_next_timelevel_start + ! + !> \brief Find the start hour of the next timelevel range after current forecast hour + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> Given a time levels string and current forecast hour, finds the start hour + !> of the next timelevel range (if any). Returns -1 if no future ranges exist. + ! + !----------------------------------------------------------------------- + subroutine get_next_timelevel_start(timelevels_str, forecast_hour, next_start_hour, next_interval_minutes, ierr)!{{{ + + implicit none + + character(len=*), intent(in) :: timelevels_str + real(kind=RKIND), intent(in) :: forecast_hour + real(kind=RKIND), intent(out) :: next_start_hour + real(kind=RKIND), intent(out) :: next_interval_minutes + integer, intent(out) :: ierr + + character(len=StrKIND) :: work_str, segment + integer :: seg_start, seg_end, local_ierr, work_len + real(kind=RKIND) :: start_hour, end_hour, best_start + real(kind=RKIND) :: seg_interval_minutes, best_interval + logical :: found + + ierr = 0 + next_start_hour = -1.0_RKIND + next_interval_minutes = 60.0_RKIND + best_start = huge(1.0_RKIND) + best_interval = 60.0_RKIND + found = .false. + work_str = trim(adjustl(timelevels_str)) + work_len = len_trim(work_str) + + if (work_len == 0) then + ierr = 1 + return + end if + + ! Parse each space-separated segment + seg_start = 1 + do while (seg_start <= work_len) + seg_end = seg_start + do while (seg_end <= work_len .and. work_str(seg_end:seg_end) /= ' ') + seg_end = seg_end + 1 + end do + seg_end = seg_end - 1 + + if (seg_end >= seg_start) then + segment = work_str(seg_start:seg_end) + call parse_output_timelevel_spec(segment, start_hour, end_hour, seg_interval_minutes, local_ierr) + + if (local_ierr == 0) then + ! Check if this range starts after current forecast hour + if (start_hour > forecast_hour .and. start_hour < best_start) then + best_start = start_hour + best_interval = seg_interval_minutes + found = .true. + end if + end if + end if + + seg_start = seg_end + 1 + do while (seg_start <= work_len .and. work_str(seg_start:seg_start) == ' ') + seg_start = seg_start + 1 + end do + end do + + if (found) then + next_start_hour = best_start + next_interval_minutes = best_interval + else + ierr = MPAS_STREAM_MGR_ERROR ! No future ranges + end if + + end subroutine get_next_timelevel_start!}}} + + + !||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| + ! + ! update_variable_output_alarm + ! + !> \brief Update variable output alarm with new interval based on forecast hour + !> \author Guoqing Ge + !> \date February 2026 + !> \details + !> This routine updates a variable output alarm after it has rung by: + !> 1. Checking if the stream uses output_timelevels + !> 2. Calculating current forecast hour (current time - start time) + !> 3. Getting the next interval from get_output_interval_from_timelevels + !> 4. Removing the old alarm and creating a new one with the updated interval + ! + !----------------------------------------------------------------------- + subroutine update_variable_output_alarm(manager, stream, alarm_name, ierr)!{{{ + + implicit none + + type (MPAS_streamManager_type), intent(inout) :: manager + type (MPAS_stream_list_type), pointer :: stream + character(len=*), intent(in) :: alarm_name + integer, intent(out), optional :: ierr + + type (MPAS_Time_type) :: start_time, current_time, alarmTime_local + type (MPAS_TimeInterval_type) :: time_diff, alarmInterval_local + real (kind=RKIND) :: forecast_hour, next_start_hour + real (kind=RKIND) :: interval_minutes, next_interval_minutes + integer :: local_ierr, ierr_tmp + integer (kind=I8KIND) :: seconds_diff + character(len=StrKIND) :: timelevels_str + + local_ierr = MPAS_STREAM_MGR_NOERR + + ! Check if stream uses output_timelevels + timelevels_str = '' + call MPAS_stream_mgr_get_property(manager, trim(stream % name), & + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, & + timelevels_str, ierr=ierr_tmp) + + if (len_trim(timelevels_str) == 0) then + ! Not a variable output stream, nothing to do + if (present(ierr)) ierr = MPAS_STREAM_MGR_NOERR + return + end if + + ! Calculate current forecast hour + start_time = mpas_get_clock_time(manager % streamClock, MPAS_START_TIME, ierr=ierr_tmp) + current_time = mpas_get_clock_time(manager % streamClock, MPAS_NOW, ierr=ierr_tmp) + time_diff = current_time - start_time + call mpas_get_timeInterval(time_diff, S_i8=seconds_diff, ierr=ierr_tmp) + forecast_hour = real(seconds_diff, RKIND) / 3600.0_RKIND + + ! Get next interval + call get_output_interval_from_timelevels(timelevels_str, forecast_hour, interval_minutes, ierr_tmp) + if (ierr_tmp /= MPAS_STREAM_MGR_NOERR .or. interval_minutes <= 0.0_RKIND) then + ! Current hour not in any range - check for future range + call get_next_timelevel_start(timelevels_str, forecast_hour, next_start_hour, next_interval_minutes, ierr_tmp) + call mpas_remove_clock_alarm(manager % streamClock, alarm_name, ierr=ierr_tmp) + + if (next_start_hour > 0.0_RKIND) then + ! Schedule alarm for when the next range starts + call mpas_set_timeInterval(time_diff, dt=next_start_hour * 3600.0_RKIND, ierr=ierr_tmp) + alarmTime_local = start_time + time_diff + call mpas_set_timeInterval(alarmInterval_local, dt=next_interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + call mpas_add_clock_alarm(manager % streamClock, alarm_name, alarmTime_local, & + alarmTimeInterval=alarmInterval_local, ierr=ierr_tmp) + end if + if (present(ierr)) ierr = MPAS_STREAM_MGR_NOERR + return + end if + + ! Convert minutes to time interval (using dt in seconds for sub-minute precision) + call mpas_set_timeInterval(alarmInterval_local, dt=interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + local_ierr = ior(local_ierr, ierr_tmp) + + ! Remove old alarm + call mpas_remove_clock_alarm(manager % streamClock, alarm_name, ierr=ierr_tmp) + local_ierr = ior(local_ierr, ierr_tmp) + + ! Add new alarm with updated interval + ! Set alarm time to current_time + interval so next ring is in the future + alarmTime_local = current_time + alarmInterval_local + call mpas_add_clock_alarm(manager % streamClock, alarm_name, alarmTime_local, & + alarmTimeInterval=alarmInterval_local, ierr=ierr_tmp) + local_ierr = ior(local_ierr, ierr_tmp) + + if (present(ierr)) ierr = local_ierr + + end subroutine update_variable_output_alarm!}}} + + end module mpas_stream_manager @@ -6342,3 +6954,144 @@ subroutine stream_mgr_add_pkg_c(manager_c, streamID_c, package_c, ierr_c) bind(c end if end subroutine stream_mgr_add_pkg_c !}}} + + +subroutine stream_mgr_set_property_c(manager_c, streamID_c, propertyName_c, propertyValue_c, ierr_c) bind(c) !{{{ + + use mpas_c_interfacing, only : mpas_c_to_f_string + use iso_c_binding, only : c_char, c_int, c_ptr, c_f_pointer + use mpas_derived_types, only : MPAS_streamManager_type, MPAS_STREAM_MGR_NOERR, & + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS + use mpas_stream_manager, only : MPAS_stream_mgr_set_property + use mpas_kind_types, only : StrKIND + + implicit none + + type (c_ptr) :: manager_c + character(kind=c_char) :: streamID_c(*) + character(kind=c_char) :: propertyName_c(*) + character(kind=c_char) :: propertyValue_c(*) + integer(kind=c_int) :: ierr_c + + type (MPAS_streamManager_type), pointer :: manager + character(len=StrKIND) :: streamID, propertyName, propertyValue + integer :: ierr + + + ierr = 0 + + call c_f_pointer(manager_c, manager) + call mpas_c_to_f_string(streamID_c, streamID) + call mpas_c_to_f_string(propertyName_c, propertyName) + call mpas_c_to_f_string(propertyValue_c, propertyValue) + + ! Map property name string to constant + if (trim(propertyName) == 'output_timelevels') then + call MPAS_stream_mgr_set_property(manager, streamID, MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, propertyValue, ierr=ierr) + end if + + if (ierr == MPAS_STREAM_MGR_NOERR) then + ierr_c = 0 + else + ierr_c = 1 + end if + +end subroutine stream_mgr_set_property_c !}}} + + +subroutine stream_mgr_add_variable_output_alarm_c(manager_c, streamID_c, ierr_c) bind(c) !{{{ + + use mpas_c_interfacing, only : mpas_c_to_f_string + use iso_c_binding, only : c_char, c_int, c_ptr, c_f_pointer + use mpas_derived_types, only : MPAS_streamManager_type, MPAS_Clock_type, MPAS_Time_type, MPAS_TimeInterval_type, & + MPAS_STREAM_MGR_NOERR, MPAS_STREAM_OUTPUT, MPAS_START_TIME, & + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, MPAS_STREAM_PROPERTY_REF_TIME + use mpas_stream_manager, only : MPAS_stream_mgr_get_clock, MPAS_stream_mgr_add_alarm, & + MPAS_stream_mgr_get_property, get_output_interval_from_timelevels, & + get_next_timelevel_start + use mpas_kind_types, only : StrKIND, RKIND + use mpas_timekeeping, only : mpas_add_clock_alarm, mpas_get_clock_time, mpas_set_time, mpas_set_timeInterval, & + mpas_get_timeInterval, operator(-), operator(+) + + implicit none + + type (c_ptr) :: manager_c + character(kind=c_char) :: streamID_c(*) + integer(kind=c_int) :: ierr_c + + type (MPAS_streamManager_type), pointer :: manager + type (MPAS_Clock_type), pointer :: clock + character(len=StrKIND) :: streamID, alarmID, timelevels_str, ref_time_str + type (MPAS_Time_type) :: alarmTime_local, ref_time, current_time, start_time + type (MPAS_TimeInterval_type) :: alarmInterval_local, time_since_ref, time_offset + integer :: ierr, ierr_tmp + real(kind=RKIND) :: forecast_hour, next_start_hour + real(kind=RKIND) :: interval_minutes, next_interval_minutes + + + ierr = 0 + ierr_tmp = 0 + + call c_f_pointer(manager_c, manager) + call mpas_c_to_f_string(streamID_c, streamID) + write(alarmID, '(a)') trim(streamID)//'_output' + + ! Get the output_timelevels property + call MPAS_stream_mgr_get_property(manager, streamID, MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS, timelevels_str, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + if (ierr /= 0 .or. trim(timelevels_str) == '') then + ierr_c = 1 + return + end if + + call MPAS_stream_mgr_get_clock(manager, clock) + + ! Get reference time for the stream + call MPAS_stream_mgr_get_property(manager, streamID, MPAS_STREAM_PROPERTY_REF_TIME, ref_time_str, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + call mpas_set_time(ref_time, dateTimeString=ref_time_str, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + ! For initial alarm, get start time + start_time = mpas_get_clock_time(clock, MPAS_START_TIME, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + ! Get interval for first output (at forecast hour 0) + forecast_hour = 0.0_RKIND + call get_output_interval_from_timelevels(timelevels_str, forecast_hour, interval_minutes, ierr_tmp) + + if (ierr_tmp /= MPAS_STREAM_MGR_NOERR .or. interval_minutes <= 0.0_RKIND) then + ! Hour 0 not in any range - find the first range's start time + call get_next_timelevel_start(timelevels_str, forecast_hour, next_start_hour, next_interval_minutes, ierr_tmp) + if (ierr_tmp /= MPAS_STREAM_MGR_NOERR .or. next_start_hour < 0.0_RKIND) then + ! No valid ranges at all + ierr_c = 1 + return + end if + ! Schedule alarm for when the first range starts + call mpas_set_timeInterval(time_offset, dt=next_start_hour * 3600.0_RKIND, ierr=ierr_tmp) + alarmTime_local = start_time + time_offset + call mpas_set_timeInterval(alarmInterval_local, dt=next_interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + else + ! Hour 0 is in a valid range - start alarm at start time + alarmTime_local = start_time + call mpas_set_timeInterval(alarmInterval_local, dt=interval_minutes * 60.0_RKIND, ierr=ierr_tmp) + end if + ierr = ior(ierr, ierr_tmp) + + ! Add the alarm + call mpas_add_clock_alarm(clock, alarmID, alarmTime_local, alarmTimeInterval=alarmInterval_local, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + ! Register alarm with stream manager + call MPAS_stream_mgr_add_alarm(manager, streamID, alarmID, MPAS_STREAM_OUTPUT, ierr=ierr_tmp) + ierr = ior(ierr, ierr_tmp) + + if (ierr == MPAS_STREAM_MGR_NOERR) then + ierr_c = 0 + else + ierr_c = 1 + end if + +end subroutine stream_mgr_add_variable_output_alarm_c !}}} diff --git a/src/framework/mpas_stream_manager_types.inc b/src/framework/mpas_stream_manager_types.inc index 1527c5411a..f0c3809865 100644 --- a/src/framework/mpas_stream_manager_types.inc +++ b/src/framework/mpas_stream_manager_types.inc @@ -21,7 +21,8 @@ MPAS_STREAM_PROPERTY_FILENAME_INTV = 11, & MPAS_STREAM_PROPERTY_CLOBBER = 12, & MPAS_STREAM_PROPERTY_IOTYPE = 13, & - MPAS_STREAM_PROPERTY_GATTR_UPDATE = 14 + MPAS_STREAM_PROPERTY_GATTR_UPDATE = 14, & + MPAS_STREAM_PROPERTY_OUTPUT_TIMELEVELS = 15 integer, public, parameter :: MPAS_STREAM_CLOBBER_NEVER = 100, & MPAS_STREAM_CLOBBER_APPEND = 101, & diff --git a/src/framework/xml_stream_parser.c b/src/framework/xml_stream_parser.c index 86a3249e06..7ed16e378b 100644 --- a/src/framework/xml_stream_parser.c +++ b/src/framework/xml_stream_parser.c @@ -31,6 +31,8 @@ void stream_mgr_add_immutable_stream_fields_c(void *, const char *, const char * void stream_mgr_add_pool_c(void *, const char *, const char *, const char *, int *); void stream_mgr_add_alarm_c(void *, const char *, const char *, const char *, const char *, int *); void stream_mgr_add_pkg_c(void *, const char *, const char *, int *); +void stream_mgr_set_property_c(void *, const char *, const char *, const char *, int *); +void stream_mgr_add_variable_output_alarm_c(void *, const char *, int *); /* @@ -347,7 +349,7 @@ int par_read(char *fname, int *mpi_comm, char **xml_buf, size_t *bufsize) *********************************************************************************/ int attribute_check(ezxml_t stream) { - const char *s_name, *s_type, *s_filename, *s_filename_intv, *s_input, *s_output, *s_ref_time; + const char *s_name, *s_type, *s_filename, *s_filename_intv, *s_input, *s_output, *s_ref_time, *s_output_timelevels; char msgbuf[MSGSIZE]; int i, len, nextchar; @@ -358,6 +360,7 @@ int attribute_check(ezxml_t stream) s_input = ezxml_attr(stream, "input_interval"); s_output = ezxml_attr(stream, "output_interval"); s_ref_time = ezxml_attr(stream, "reference_time"); + s_output_timelevels = ezxml_attr(stream, "output_timelevels"); /* @@ -380,15 +383,20 @@ int attribute_check(ezxml_t stream) /* - * Check that input streams have an input interval, output streams have an output interval + * Check that input streams have an input interval, output streams have an output interval or output_timelevels */ if (strstr(s_type, "input") != NULL && s_input == NULL) { snprintf(msgbuf, MSGSIZE, "stream \"%s\" is an input stream and must have the \"input_interval\" attribute.", s_name); fmt_err(msgbuf); return 1; } - if (strstr(s_type, "output") != NULL && s_output == NULL) { - snprintf(msgbuf, MSGSIZE, "stream \"%s\" is an output stream and must have the \"output_interval\" attribute.", s_name); + if (strstr(s_type, "output") != NULL && s_output == NULL && s_output_timelevels == NULL) { + snprintf(msgbuf, MSGSIZE, "stream \"%s\" is an output stream and must have either the \"output_interval\" or \"output_timelevels\" attribute.", s_name); + fmt_err(msgbuf); + return 1; + } + if (s_output != NULL && s_output_timelevels != NULL) { + snprintf(msgbuf, MSGSIZE, "stream \"%s\" cannot have both \"output_interval\" and \"output_timelevels\" attributes.", s_name); fmt_err(msgbuf); return 1; } @@ -1433,6 +1441,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) /* Next, handle modifications to mutable streams as well as new stream definitions */ immutable = 0; for (stream_xml = ezxml_child(streams, "stream"); stream_xml; stream_xml = ezxml_next(stream_xml)) { + const char *output_timelevels; streamID = ezxml_attr(stream_xml, "name"); direction = ezxml_attr(stream_xml, "type"); filename_template = ezxml_attr(stream_xml, "filename_template"); @@ -1441,6 +1450,7 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) interval_in2 = ezxml_attr(stream_xml, "input_interval"); interval_out = ezxml_attr(stream_xml, "output_interval"); interval_out2 = ezxml_attr(stream_xml, "output_interval"); + output_timelevels = ezxml_attr(stream_xml, "output_timelevels"); reference_time = ezxml_attr(stream_xml, "reference_time"); record_interval = ezxml_attr(stream_xml, "record_interval"); precision = ezxml_attr(stream_xml, "precision"); @@ -1481,21 +1491,27 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) if ( strstr(direction, "input") != NULL && strstr(direction, "output") != NULL ) { /* If input interval is an interval (i.e. not initial_only/final_only or none) set filename_interval to the interval. */ - if ( strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ + if ( interval_in && strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ filename_interval = interval_in2; /* If output interval is an interval (i.e. not initial_only/final_only or none) set filename_interval to the interval. */ - } else if ( strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ + } else if ( interval_out && strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ filename_interval = interval_out2; + /* If output_timelevels is set, use 'output' to get unique filename for each write */ + } else if ( output_timelevels != NULL ) { + filename_interval = "output"; } /* Check for an input stream. */ } else if ( strstr(direction, "input") != NULL ) { - if ( strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ + if ( interval_in && strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ){ filename_interval = interval_in2; } /* Check for an output stream. */ } else if ( strstr(direction, "output") != NULL ) { - if ( strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ + if ( interval_out && strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ){ filename_interval = interval_out2; + /* If output_timelevels is set, use 'output' to get unique filename for each write */ + } else if ( output_timelevels != NULL ) { + filename_interval = "output"; } } } else { @@ -1507,13 +1523,13 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) * to force it's value to be none as well. */ if ( strstr(filename_interval, "input_interval") != NULL ) { - if ( strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ) { + if ( interval_in && strstr(interval_in, "initial_only") == NULL && strstr(interval_in, "final_only") == NULL && strstr(interval_in, "none") == NULL ) { filename_interval = interval_in2; } else { filename_interval = NULL; } } else if ( strstr(filename_interval, "output_interval") != NULL ) { - if ( strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ) { + if ( interval_out && strstr(interval_out, "initial_only") == NULL && strstr(interval_out, "final_only") == NULL && strstr(interval_out, "none") == NULL ) { filename_interval = interval_out2; } else { filename_interval = NULL; @@ -1696,6 +1712,17 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) return; } + /* If output_timelevels is specified, set it as a property */ + if (output_timelevels != NULL) { + stream_mgr_set_property_c(manager, streamID, "output_timelevels", output_timelevels, &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output timelevels:", output_timelevels); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } + /* Possibly add an input alarm for this stream */ if (itype == 3 || itype == 1) { stream_mgr_add_alarm_c(manager, streamID, "input", "start", interval_in2, &err); @@ -1714,16 +1741,28 @@ void xml_stream_parser(char *fname, void *manager, int *mpi_comm, int *status) /* Possibly add an output alarm for this stream */ if (itype == 3 || itype == 2) { - stream_mgr_add_alarm_c(manager, streamID, "output", "start", interval_out2, &err); - if (err != 0) { - *status = 1; - return; - } - if ( strcmp(interval_out, interval_out2) != 0 ) { - snprintf(msgbuf, MSGSIZE, " %-20s%s (%s)", "output alarm:", interval_out, interval_out2); - mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + /* If output_timelevels is specified, use variable alarm; otherwise use fixed interval */ + if (output_timelevels == NULL) { + stream_mgr_add_alarm_c(manager, streamID, "output", "start", interval_out2, &err); + if (err != 0) { + *status = 1; + return; + } + if ( strcmp(interval_out, interval_out2) != 0 ) { + snprintf(msgbuf, MSGSIZE, " %-20s%s (%s)", "output alarm:", interval_out, interval_out2); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } else { + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output alarm:", interval_out); + mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); + } } else { - snprintf(msgbuf, MSGSIZE, " %-20s%s", "output alarm:", interval_out); + /* Use variable output alarm based on timelevels */ + stream_mgr_add_variable_output_alarm_c(manager, streamID, &err); + if (err != 0) { + *status = 1; + return; + } + snprintf(msgbuf, MSGSIZE, " %-20s%s", "output alarm:", "variable (from output_timelevels)"); mpas_log_write_c(msgbuf, "MPAS_LOG_OUT"); } }