Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions bbqueue/src/prod_cons/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ where
to_release: usize,
}

/// A split reading grant into the storage buffer
///
/// This is useful for reading data that wraps around the internal ring buffer.
///
/// Grants implement methods to access the two underlying buffers.
///
/// Write access is provided for read grants in case it is necessary to mutate
/// the storage in-place for decoding.
pub struct SplitGrantR<Q>
where
Q: BbqHandle,
{
bbq: Q::Target,
ptr1: NonNull<u8>,
len1: usize,
ptr2: NonNull<u8>,
len2: usize,
to_release: usize,
}

// ---- impls ----

// ---- StreamProducer ----
Expand Down Expand Up @@ -174,6 +194,31 @@ where
to_release: 0,
})
}

/// Obtain a split read grant
///
/// This is useful for reading all available data when the data wraps
/// around the internal ring buffer.
pub fn split_read(&self) -> Result<SplitGrantR<Q>, ReadGrantError> {
let (ptr, _cap) = unsafe { self.bbq.sto.ptr_len() };
let [(offset1, len1), (offset2, len2)] = self.bbq.cor.split_read()?;
let ptr1 = unsafe {
let p = ptr.as_ptr().byte_add(offset1);
NonNull::new_unchecked(p)
};
let ptr2 = unsafe {
let p = ptr.as_ptr().byte_add(offset2);
NonNull::new_unchecked(p)
};
Ok(SplitGrantR {
bbq: self.bbq.clone(),
ptr1,
len1,
ptr2,
len2,
to_release: 0,
})
}
}

impl<Q> StreamConsumer<Q>
Expand Down Expand Up @@ -312,3 +357,74 @@ where
}

unsafe impl<Q: BbqHandle + Send> Send for StreamGrantR<Q> {}

// ---- SplitGrantR ----

impl<Q> SplitGrantR<Q>
where
Q: BbqHandle,
{
/// Make `used` bytes available for writing.
///
/// `used` is capped to the total length of the grant
pub fn release(self, used: usize) {
let used1 = used.min(self.len1);
let used2 = (used - used1).min(self.len2);
self.bbq.cor.split_release_inner(used1, used2);
if used != 0 {
self.bbq.not.wake_one_producer();
}
core::mem::forget(self);
}
}

impl<Q> SplitGrantR<Q>
where
Q: BbqHandle,
{
/// Obtain the combined length of both buffers
pub fn combined_len(&self) -> usize {
self.len1 + self.len2
}

/// Obtain the two buffers represented by this split read grant
pub fn bufs(&self) -> (&[u8], &[u8]) {
let buf1 = unsafe { core::slice::from_raw_parts(self.ptr1.as_ptr(), self.len1) };
let buf2 = unsafe { core::slice::from_raw_parts(self.ptr2.as_ptr(), self.len2) };
(buf1, buf2)
}

/// Obtain the two mutable buffers represented by this split read grant
pub fn bufs_mut(&mut self) -> (&mut [u8], &mut [u8]) {
let buf1 = unsafe { core::slice::from_raw_parts_mut(self.ptr1.as_ptr(), self.len1) };
let buf2 = unsafe { core::slice::from_raw_parts_mut(self.ptr2.as_ptr(), self.len2) };
(buf1, buf2)
}
}

impl<Q> Drop for SplitGrantR<Q>
where
Q: BbqHandle,
{
fn drop(&mut self) {
let SplitGrantR {
bbq,
ptr1: _,
len1,
ptr2: _,
len2,
to_release,
} = self;
let len1 = *len1;
let len2 = *len2;
let used = (*to_release).min(len1 + len2);
let used1 = used.min(len1);
let used2 = (used - used1).min(len2);
bbq.cor.split_release_inner(used1, used2);
if used != 0 {
bbq.not.wake_one_producer();
}
}
}

unsafe impl<Q: BbqHandle + Send> Send for SplitGrantR<Q> {}
73 changes: 73 additions & 0 deletions bbqueue/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,77 @@ mod test {
}
rgr.release();
}

#[test]
fn split_sanity_check() {
let bb: BBQueue<Inline<10>, AtomicCoord, Polling> = BBQueue::new();
let prod = bb.stream_producer();
let cons = bb.stream_consumer();

// Fill buffer
let mut wgrant = prod.grant_exact(10).unwrap();
assert_eq!(wgrant.len(), 10);
wgrant.copy_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
wgrant.commit(10);

let rgrant = cons.split_read().unwrap();
assert_eq!(rgrant.combined_len(), 10);
assert_eq!(
rgrant.bufs(),
(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10][..], &[][..])
);
// Release part of the buffer
rgrant.release(6);

// Almost fill buffer again => | 11 | 12 | 13 | 14 | 15 | x | 7 | 8 | 9 | 10 |
let mut wgrant = prod.grant_exact(5).unwrap();
assert_eq!(wgrant.len(), 5);
wgrant.copy_from_slice(&[11, 12, 13, 14, 15]);
wgrant.commit(5);

let rgrant = cons.split_read().unwrap();
assert_eq!(rgrant.combined_len(), 9);
assert_eq!(
rgrant.bufs(),
(&[7, 8, 9, 10][..], &[11, 12, 13, 14, 15][..])
);

// Release part of the buffer => | x | x | x | 14 | 15 | x | x | x | x | x |
rgrant.release(7);

// Check that it is possible to claim exactly the remaining space
assert!(prod.grant_exact(5).is_ok());

// Check that it is not possible to claim more space than what should be available
assert!(prod.grant_exact(6).is_err());

// Fill buffer to the end => | x | x | x | 14 | 15 | 21 | 22 | 23 | 24 | 25 |
let mut wgrant = prod.grant_exact(5).unwrap();
wgrant.copy_from_slice(&[21, 22, 23, 24, 25]);
wgrant.commit(5);

let rgrant = cons.split_read().unwrap();
assert_eq!(rgrant.combined_len(), 7);
assert_eq!(rgrant.bufs(), (&[14, 15, 21, 22, 23, 24, 25][..], &[][..]));
rgrant.release(0);

// Fill buffer to the end => | 26 | 27 | x | 14 | 15 | 21 | 22 | 23 | 24 | 25 |
let mut wgrant = prod.grant_exact(2).unwrap();
wgrant.copy_from_slice(&[26, 27]);
wgrant.commit(2);

// Fill buffer to the end => | x | 27 | x | x | x | x | x | x | x | x |
let rgrant = cons.split_read().unwrap();
assert_eq!(rgrant.combined_len(), 9);
assert_eq!(
rgrant.bufs(),
(&[14, 15, 21, 22, 23, 24, 25][..], &[26, 27][..])
);
rgrant.release(8);

let rgrant = cons.split_read().unwrap();
assert_eq!(rgrant.combined_len(), 1);
assert_eq!(rgrant.bufs(), (&[27][..], &[][..]));
rgrant.release(1);
}
}
60 changes: 60 additions & 0 deletions bbqueue/src/traits/coordination/cas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,45 @@ unsafe impl Coord for AtomicCoord {
Ok((read, sz))
}

fn split_read(&self) -> Result<[(usize, usize); 2], ReadGrantError> {
if self.read_in_progress.swap(true, Ordering::AcqRel) {
return Err(ReadGrantError::GrantInProgress);
}

let write = self.write.load(Ordering::Acquire);
let last = self.last.load(Ordering::Acquire);
let mut read = self.read.load(Ordering::Acquire);

// Resolve the inverted case or end of read
if (read == last) && (write < read) {
read = 0;
// This has some room for error, the other thread reads this
// Impact to Grant:
// Grant checks if read < write to see if inverted. If not inverted, but
// no space left, Grant will initiate an inversion, but will not trigger it
// Impact to Commit:
// Commit does not check read, but if Grant has started an inversion,
// grant could move Last to the prior write position
// MOVING READ BACKWARDS!
self.read.store(0, Ordering::Release);
}

let (sz1, sz2) = if write < read {
// Inverted, only believe last
(last - read, write)
} else {
// Not inverted, only believe write
(write - read, 0)
};

if sz1 == 0 {
self.read_in_progress.store(false, Ordering::Release);
return Err(ReadGrantError::Empty);
}

Ok([(read, sz1), (0, sz2)])
}

fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) {
// If there is no grant in progress, return early. This
// generally means we are dropping the grant within a
Expand Down Expand Up @@ -280,6 +319,27 @@ unsafe impl Coord for AtomicCoord {
// This should be fine, purely incrementing
let _ = self.read.fetch_add(used, Ordering::Release);

self.read_in_progress.store(false, Ordering::Release);
}
fn split_release_inner(&self, used1: usize, used2: usize) {
// If there is no grant in progress, return early. This
// generally means we are dropping the grant within a
// wrapper structure
if !self.read_in_progress.load(Ordering::Acquire) {
return;
}

// // This should always be checked by the public interfaces
// debug_assert!(used <= self.buf.len());

if used2 == 0 {
// This should be fine, purely incrementing
let _ = self.read.fetch_add(used1, Ordering::Release);
} else {
// Also release parts of the second buffer
self.read.store(used2, Ordering::Release);
}

self.read_in_progress.store(false, Ordering::Release);
}
}
64 changes: 64 additions & 0 deletions bbqueue/src/traits/coordination/cs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,48 @@ unsafe impl Coord for CsCoord {
})
}

fn split_read(&self) -> Result<[(usize, usize); 2], ReadGrantError> {
critical_section::with(|_cs| {
if self.read_in_progress.load(Ordering::Relaxed) {
return Err(ReadGrantError::GrantInProgress);
}
self.read_in_progress.store(true, Ordering::Relaxed);

let write = self.write.load(Ordering::Relaxed);
let last = self.last.load(Ordering::Relaxed);
let mut read = self.read.load(Ordering::Relaxed);

// Resolve the inverted case or end of read
if (read == last) && (write < read) {
read = 0;
// This has some room for error, the other thread reads this
// Impact to Grant:
// Grant checks if read < write to see if inverted. If not inverted, but
// no space left, Grant will initiate an inversion, but will not trigger it
// Impact to Commit:
// Commit does not check read, but if Grant has started an inversion,
// grant could move Last to the prior write position
// MOVING READ BACKWARDS!
self.read.store(0, Ordering::Relaxed);
}

let (sz1, sz2) = if write < read {
// Inverted, only believe last
(last - read, write)
} else {
// Not inverted, only believe write
(write - read, 0)
};

if sz1 == 0 {
self.read_in_progress.store(false, Ordering::Relaxed);
return Err(ReadGrantError::Empty);
}

Ok([(read, sz1), (0, sz2)])
})
}

fn commit_inner(&self, capacity: usize, grant_len: usize, used: usize) {
critical_section::with(|_cs| {
// If there is no grant in progress, return early. This
Expand Down Expand Up @@ -303,4 +345,26 @@ unsafe impl Coord for CsCoord {
self.read_in_progress.store(false, Ordering::Relaxed);
})
}
fn split_release_inner(&self, used1: usize, used2: usize) {
critical_section::with(|_cs| {
// If there is no grant in progress, return early. This
// generally means we are dropping the grant within a
// wrapper structure
if !self.read_in_progress.load(Ordering::Acquire) {
return;
}

// // This should always be checked by the public interfaces
// debug_assert!(used <= self.buf.len());

// This should be fine, purely incrementing
let old_read = self.read.load(Ordering::Relaxed);
if used2 == 0 {
self.read.store(used1 + old_read, Ordering::Relaxed);
} else {
self.read.store(used2, Ordering::Relaxed);
}
self.read_in_progress.store(false, Ordering::Relaxed);
})
}
}
13 changes: 12 additions & 1 deletion bbqueue/src/traits/coordination/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,20 @@ pub unsafe trait Coord: ConstInit {

// Read Grants

/// Attempt to obtain a read grant.
/// Attempt to obtain a read grant. Returns `Ok((start, size))` on success.
fn read(&self) -> Result<(usize, usize), ReadGrantError>;

/// Attempt to obtain a split read grant. Return `Ok(((start1, size1), (start2, size2)))` on success.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's worth adding a default impl to this that always returns an error, in order to avoid a breaking change for now?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we DO make a breaking change, I wonder if it's worth making an struct OffsetSlice { offset: usize, len: usize }, and returning Result<[OffsetSlice; 2], ReadGrantError> or something in order to avoid complex tuple types (esp. since both are usizes types!)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of an error, always return an empty second section?
Though that might be more confusing in terms of debugging in some cases.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be "too clever by half", yeah.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea I just had would be to have a new trait SplitCoord: Coord that has split_read and split_release_inner (which I realized I need too). Then SplitGrantR can be given out only if Q::Coord :SplitCoord.
Adding a default impl would be fine, too. What do you prefer?
I like the OffsetSlice idea.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to the specialized trait idea, as long as it looks reasonable in practice! I know we do that with Notify already.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, I don't think having split grants adds any overhead, like having async notify would. That actually makes me think maybe we don't specialize it? Sorry to give you conflicting opinions, but I'd actually maybe save "specialization" traits for cases where having the specialized version "costs" more.

fn split_read(&self) -> Result<[(usize, usize); 2], ReadGrantError> {
unimplemented!()
}

/// Mark `used` bytes as available for writing
fn release_inner(&self, used: usize);

/// Mark `used1 + used2` bytes as available for writing.
/// `used1` corresponds to the first split grant, `used2` to the second.
fn split_release_inner(&self, _used1: usize, _used2: usize) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not super happy with this signature, but I can't put my finger on it...
What do you think?

unimplemented!()
}
}