diff --git a/bbqueue/src/prod_cons/stream.rs b/bbqueue/src/prod_cons/stream.rs index bfcd7b9..779a390 100644 --- a/bbqueue/src/prod_cons/stream.rs +++ b/bbqueue/src/prod_cons/stream.rs @@ -63,6 +63,26 @@ where to_release: usize, } +/// A split reading grant into the storage buffer +/// +/// This is useful for reading data that wraps around the internal ring buffer. +/// +/// Grants implement methods to access the two underlying buffers. +/// +/// Write access is provided for read grants in case it is necessary to mutate +/// the storage in-place for decoding. +pub struct SplitGrantR +where + Q: BbqHandle, +{ + bbq: Q::Target, + ptr1: NonNull, + len1: usize, + ptr2: NonNull, + len2: usize, + to_release: usize, +} + // ---- impls ---- // ---- StreamProducer ---- @@ -174,6 +194,31 @@ where to_release: 0, }) } + + /// Obtain a split read grant + /// + /// This is useful for reading all available data when the data wraps + /// around the internal ring buffer. + pub fn split_read(&self) -> Result, ReadGrantError> { + let (ptr, _cap) = unsafe { self.bbq.sto.ptr_len() }; + let [(offset1, len1), (offset2, len2)] = self.bbq.cor.split_read()?; + let ptr1 = unsafe { + let p = ptr.as_ptr().byte_add(offset1); + NonNull::new_unchecked(p) + }; + let ptr2 = unsafe { + let p = ptr.as_ptr().byte_add(offset2); + NonNull::new_unchecked(p) + }; + Ok(SplitGrantR { + bbq: self.bbq.clone(), + ptr1, + len1, + ptr2, + len2, + to_release: 0, + }) + } } impl StreamConsumer @@ -312,3 +357,74 @@ where } unsafe impl Send for StreamGrantR {} + +// ---- SplitGrantR ---- + +impl SplitGrantR +where + Q: BbqHandle, +{ + /// Make `used` bytes available for writing. + /// + /// `used` is capped to the total length of the grant + pub fn release(self, used: usize) { + let used1 = used.min(self.len1); + let used2 = (used - used1).min(self.len2); + self.bbq.cor.split_release_inner(used1, used2); + if used != 0 { + self.bbq.not.wake_one_producer(); + } + core::mem::forget(self); + } +} + +impl SplitGrantR +where + Q: BbqHandle, +{ + /// Obtain the combined length of both buffers + pub fn combined_len(&self) -> usize { + self.len1 + self.len2 + } + + /// Obtain the two buffers represented by this split read grant + pub fn bufs(&self) -> (&[u8], &[u8]) { + let buf1 = unsafe { core::slice::from_raw_parts(self.ptr1.as_ptr(), self.len1) }; + let buf2 = unsafe { core::slice::from_raw_parts(self.ptr2.as_ptr(), self.len2) }; + (buf1, buf2) + } + + /// Obtain the two mutable buffers represented by this split read grant + pub fn bufs_mut(&mut self) -> (&mut [u8], &mut [u8]) { + let buf1 = unsafe { core::slice::from_raw_parts_mut(self.ptr1.as_ptr(), self.len1) }; + let buf2 = unsafe { core::slice::from_raw_parts_mut(self.ptr2.as_ptr(), self.len2) }; + (buf1, buf2) + } +} + +impl Drop for SplitGrantR +where + Q: BbqHandle, +{ + fn drop(&mut self) { + let SplitGrantR { + bbq, + ptr1: _, + len1, + ptr2: _, + len2, + to_release, + } = self; + let len1 = *len1; + let len2 = *len2; + let used = (*to_release).min(len1 + len2); + let used1 = used.min(len1); + let used2 = (used - used1).min(len2); + bbq.cor.split_release_inner(used1, used2); + if used != 0 { + bbq.not.wake_one_producer(); + } + } +} + +unsafe impl Send for SplitGrantR {} diff --git a/bbqueue/src/queue.rs b/bbqueue/src/queue.rs index b24feba..79d4401 100644 --- a/bbqueue/src/queue.rs +++ b/bbqueue/src/queue.rs @@ -168,4 +168,77 @@ mod test { } rgr.release(); } + + #[test] + fn split_sanity_check() { + let bb: BBQueue, AtomicCoord, Polling> = BBQueue::new(); + let prod = bb.stream_producer(); + let cons = bb.stream_consumer(); + + // Fill buffer + let mut wgrant = prod.grant_exact(10).unwrap(); + assert_eq!(wgrant.len(), 10); + wgrant.copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + wgrant.commit(10); + + let rgrant = cons.split_read().unwrap(); + assert_eq!(rgrant.combined_len(), 10); + assert_eq!( + rgrant.bufs(), + (&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..], &[][..]) + ); + // Release part of the buffer + rgrant.release(6); + + // Almost fill buffer again => | 11 | 12 | 13 | 14 | 15 | x | 7 | 8 | 9 | 10 | + let mut wgrant = prod.grant_exact(5).unwrap(); + assert_eq!(wgrant.len(), 5); + wgrant.copy_from_slice(&[11, 12, 13, 14, 15]); + wgrant.commit(5); + + let rgrant = cons.split_read().unwrap(); + assert_eq!(rgrant.combined_len(), 9); + assert_eq!( + rgrant.bufs(), + (&[7, 8, 9, 10][..], &[11, 12, 13, 14, 15][..]) + ); + + // Release part of the buffer => | x | x | x | 14 | 15 | x | x | x | x | x | + rgrant.release(7); + + // Check that it is possible to claim exactly the remaining space + assert!(prod.grant_exact(5).is_ok()); + + // Check that it is not possible to claim more space than what should be available + assert!(prod.grant_exact(6).is_err()); + + // Fill buffer to the end => | x | x | x | 14 | 15 | 21 | 22 | 23 | 24 | 25 | + let mut wgrant = prod.grant_exact(5).unwrap(); + wgrant.copy_from_slice(&[21, 22, 23, 24, 25]); + wgrant.commit(5); + + let rgrant = cons.split_read().unwrap(); + assert_eq!(rgrant.combined_len(), 7); + assert_eq!(rgrant.bufs(), (&[14, 15, 21, 22, 23, 24, 25][..], &[][..])); + rgrant.release(0); + + // Fill buffer to the end => | 26 | 27 | x | 14 | 15 | 21 | 22 | 23 | 24 | 25 | + let mut wgrant = prod.grant_exact(2).unwrap(); + wgrant.copy_from_slice(&[26, 27]); + wgrant.commit(2); + + // Fill buffer to the end => | x | 27 | x | x | x | x | x | x | x | x | + let rgrant = cons.split_read().unwrap(); + assert_eq!(rgrant.combined_len(), 9); + assert_eq!( + rgrant.bufs(), + (&[14, 15, 21, 22, 23, 24, 25][..], &[26, 27][..]) + ); + rgrant.release(8); + + let rgrant = cons.split_read().unwrap(); + assert_eq!(rgrant.combined_len(), 1); + assert_eq!(rgrant.bufs(), (&[27][..], &[][..])); + rgrant.release(1); + } } diff --git a/bbqueue/src/traits/coordination/cas.rs b/bbqueue/src/traits/coordination/cas.rs index ea19e35..3a63ae7 100644 --- a/bbqueue/src/traits/coordination/cas.rs +++ b/bbqueue/src/traits/coordination/cas.rs @@ -216,6 +216,45 @@ unsafe impl Coord for AtomicCoord { Ok((read, sz)) } + fn split_read(&self) -> Result<[(usize, usize); 2], ReadGrantError> { + if self.read_in_progress.swap(true, Ordering::AcqRel) { + return Err(ReadGrantError::GrantInProgress); + } + + let write = self.write.load(Ordering::Acquire); + let last = self.last.load(Ordering::Acquire); + let mut read = self.read.load(Ordering::Acquire); + + // Resolve the inverted case or end of read + if (read == last) && (write < read) { + read = 0; + // This has some room for error, the other thread reads this + // Impact to Grant: + // Grant checks if read < write to see if inverted. If not inverted, but + // no space left, Grant will initiate an inversion, but will not trigger it + // Impact to Commit: + // Commit does not check read, but if Grant has started an inversion, + // grant could move Last to the prior write position + // MOVING READ BACKWARDS! + self.read.store(0, Ordering::Release); + } + + let (sz1, sz2) = if write < read { + // Inverted, only believe last + (last - read, write) + } else { + // Not inverted, only believe write + (write - read, 0) + }; + + if sz1 == 0 { + self.read_in_progress.store(false, Ordering::Release); + return Err(ReadGrantError::Empty); + } + + Ok([(read, sz1), (0, sz2)]) + } + fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) { // If there is no grant in progress, return early. This // generally means we are dropping the grant within a @@ -280,6 +319,27 @@ unsafe impl Coord for AtomicCoord { // This should be fine, purely incrementing let _ = self.read.fetch_add(used, Ordering::Release); + self.read_in_progress.store(false, Ordering::Release); + } + fn split_release_inner(&self, used1: usize, used2: usize) { + // If there is no grant in progress, return early. This + // generally means we are dropping the grant within a + // wrapper structure + if !self.read_in_progress.load(Ordering::Acquire) { + return; + } + + // // This should always be checked by the public interfaces + // debug_assert!(used <= self.buf.len()); + + if used2 == 0 { + // This should be fine, purely incrementing + let _ = self.read.fetch_add(used1, Ordering::Release); + } else { + // Also release parts of the second buffer + self.read.store(used2, Ordering::Release); + } + self.read_in_progress.store(false, Ordering::Release); } } diff --git a/bbqueue/src/traits/coordination/cs.rs b/bbqueue/src/traits/coordination/cs.rs index 5c5dd47..7ca3146 100644 --- a/bbqueue/src/traits/coordination/cs.rs +++ b/bbqueue/src/traits/coordination/cs.rs @@ -231,6 +231,48 @@ unsafe impl Coord for CsCoord { }) } + fn split_read(&self) -> Result<[(usize, usize); 2], ReadGrantError> { + critical_section::with(|_cs| { + if self.read_in_progress.load(Ordering::Relaxed) { + return Err(ReadGrantError::GrantInProgress); + } + self.read_in_progress.store(true, Ordering::Relaxed); + + let write = self.write.load(Ordering::Relaxed); + let last = self.last.load(Ordering::Relaxed); + let mut read = self.read.load(Ordering::Relaxed); + + // Resolve the inverted case or end of read + if (read == last) && (write < read) { + read = 0; + // This has some room for error, the other thread reads this + // Impact to Grant: + // Grant checks if read < write to see if inverted. If not inverted, but + // no space left, Grant will initiate an inversion, but will not trigger it + // Impact to Commit: + // Commit does not check read, but if Grant has started an inversion, + // grant could move Last to the prior write position + // MOVING READ BACKWARDS! + self.read.store(0, Ordering::Relaxed); + } + + let (sz1, sz2) = if write < read { + // Inverted, only believe last + (last - read, write) + } else { + // Not inverted, only believe write + (write - read, 0) + }; + + if sz1 == 0 { + self.read_in_progress.store(false, Ordering::Relaxed); + return Err(ReadGrantError::Empty); + } + + Ok([(read, sz1), (0, sz2)]) + }) + } + fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) { critical_section::with(|_cs| { // If there is no grant in progress, return early. This @@ -303,4 +345,26 @@ unsafe impl Coord for CsCoord { self.read_in_progress.store(false, Ordering::Relaxed); }) } + fn split_release_inner(&self, used1: usize, used2: usize) { + critical_section::with(|_cs| { + // If there is no grant in progress, return early. This + // generally means we are dropping the grant within a + // wrapper structure + if !self.read_in_progress.load(Ordering::Acquire) { + return; + } + + // // This should always be checked by the public interfaces + // debug_assert!(used <= self.buf.len()); + + // This should be fine, purely incrementing + let old_read = self.read.load(Ordering::Relaxed); + if used2 == 0 { + self.read.store(used1 + old_read, Ordering::Relaxed); + } else { + self.read.store(used2, Ordering::Relaxed); + } + self.read_in_progress.store(false, Ordering::Relaxed); + }) + } } diff --git a/bbqueue/src/traits/coordination/mod.rs b/bbqueue/src/traits/coordination/mod.rs index 3e5a1e0..4936cf6 100644 --- a/bbqueue/src/traits/coordination/mod.rs +++ b/bbqueue/src/traits/coordination/mod.rs @@ -84,9 +84,20 @@ pub unsafe trait Coord: ConstInit { // Read Grants - /// Attempt to obtain a read grant. + /// Attempt to obtain a read grant. Returns `Ok((start, size))` on success. fn read(&self) -> Result<(usize, usize), ReadGrantError>; + /// Attempt to obtain a split read grant. Return `Ok(((start1, size1), (start2, size2)))` on success. + fn split_read(&self) -> Result<[(usize, usize); 2], ReadGrantError> { + unimplemented!() + } + /// Mark `used` bytes as available for writing fn release_inner(&self, used: usize); + + /// Mark `used1 + used2` bytes as available for writing. + /// `used1` corresponds to the first split grant, `used2` to the second. + fn split_release_inner(&self, _used1: usize, _used2: usize) { + unimplemented!() + } }