Skip to content

Commit

Permalink
sound: clean up and improve comments, definitions
Browse files Browse the repository at this point in the history
- Add some documentation comments
- Rename TxState to IoState in preparation for RX implementation
- Fix inline rust code not being escaped properly in doc comments
- Rename some fields to make their function clearer
- Cleanup some error log messages
- Replace an old TODO comment about queue size with an explanation

Signed-off-by: Manos Pitsidianakis <manos.pitsidianakis@linaro.org>
  • Loading branch information
epilys authored and stsquad committed Oct 24, 2023
1 parent 04f80fc commit 30d5cf4
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 44 deletions.
25 changes: 16 additions & 9 deletions staging/vhost-device-sound/src/audio_backends/alsa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,14 @@ fn update_pcm(
Ok(())
}

// Returns `true` if the function should be called again, because there are are
// more data left to write.
// Returns `Ok(true)` if the function should be called again, because there are
// are more data left to write.
fn write_samples_direct(
pcm: &alsa::PCM,
stream: &mut Stream,
mmap: &mut alsa::direct::pcm::MmapPlayback<u8>,
) -> AResult<bool> {
while mmap.avail() > 0 {
// Write samples to DMA area from iterator
let Some(buffer) = stream.buffers.front_mut() else {
return Ok(false);
};
Expand All @@ -223,6 +222,7 @@ fn write_samples_direct(
}
Ok(v) => v,
};
// Write samples to DMA area from iterator
let mut iter = buf[0..read_bytes as usize].iter().cloned();
let frames = mmap.write(&mut iter);
let written_bytes = pcm.frames_to_bytes(frames);
Expand All @@ -240,6 +240,8 @@ fn write_samples_direct(
}
}

// Returns `Ok(true)` if the function should be called again, because there are
// are more data left to write.
fn write_samples_io(
p: &alsa::PCM,
streams: &Arc<RwLock<Vec<Stream>>>,
Expand Down Expand Up @@ -385,6 +387,9 @@ impl AlsaBackend {
let mut senders = Vec::with_capacity(streams_no);
for i in 0..streams_no {
let (sender, receiver) = channel();

// Initialize with a dummy value, which will be updated every time we call
// `update_pcm`.
let pcm = Arc::new(Mutex::new(PCM::new("default", Direction::Playback, false)?));

let mtx = Arc::clone(&pcm);
Expand Down Expand Up @@ -495,16 +500,18 @@ impl AlsaBackend {
msg.code = VIRTIO_SND_S_BAD_MSG;
continue;
};
// Stop the worker.
// Stop worker thread
senders[stream_id].send(false).unwrap();
let mut streams = streams.write().unwrap();
if let Err(err) = streams[stream_id].state.release() {
log::error!("Stream {}: {}", stream_id, err);
msg.code = VIRTIO_SND_S_BAD_MSG;
}
// Release buffers even if state transition is invalid. If it is invalid, we
// won't be in a valid device state anyway so better to get rid of them and
// free the virt queue.
// Drop pending stream buffers to complete pending I/O messages
//
// This will release buffers even if state transition is invalid. If it is
// invalid, we won't be in a valid device state anyway so better to get rid of
// them and free the virt queue.
std::mem::take(&mut streams[stream_id].buffers);
}
AlsaAction::SetParameters(stream_id, mut msg) => {
Expand Down Expand Up @@ -545,9 +552,9 @@ impl AlsaBackend {
st.params.format = request.format;
st.params.rate = request.rate;
}
// Manually drop msg for faster response: the kernel has a timeout.
drop(msg);
}
// Manually drop msg for faster response: the kernel has a timeout.
drop(msg);
update_pcm(&pcms[stream_id], stream_id, &streams)?;
}
}
Expand Down
60 changes: 36 additions & 24 deletions staging/vhost-device-sound/src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,42 +467,51 @@ impl VhostUserSoundThread {
return Ok(true);
}

// Instead of counting descriptor chain lengths, encode the "parsing" logic in
// an enumeration. Then, the compiler will complain about any unhandled
// match {} cases if any part of the code is changed. This makes invalid
// states unrepresentable in the source code.
#[derive(Copy, Clone, PartialEq, Debug)]
enum TxState {
enum IoState {
Ready,
WaitingBufferForStreamId(u32),
Done,
}

// Keep log of stream IDs to wake up, in case the guest has queued more than
// one.
let mut stream_ids = BTreeSet::default();

for desc_chain in requests {
let mut state = TxState::Ready;
let mut state = IoState::Ready;
let mut buffers = vec![];
let descriptors: Vec<_> = desc_chain.clone().collect();
let message = Arc::new(IOMessage {
vring: vring.clone(),
status: VIRTIO_SND_S_OK.into(),
latency_bytes: 0.into(),
desc_chain: desc_chain.clone(),
descriptor: descriptors.last().cloned().unwrap(),
response_descriptor: descriptors.last().cloned().ok_or_else(|| {
log::error!("Received IO request with an empty descriptor chain.");
Error::UnexpectedDescriptorCount(0)
})?,
});
for descriptor in &descriptors {
match state {
TxState::Done => {
IoState::Done => {
return Err(Error::UnexpectedDescriptorCount(descriptors.len()).into());
}
TxState::Ready if descriptor.is_write_only() => {
IoState::Ready if descriptor.is_write_only() => {
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
descriptor.len(),
)
.into());
}
state = TxState::Done;
state = IoState::Done;
}
TxState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => {
IoState::WaitingBufferForStreamId(stream_id) if descriptor.is_write_only() => {
if descriptor.len() as usize != size_of::<VirtioSoundPcmStatus>() {
return Err(Error::UnexpectedDescriptorSize(
size_of::<VirtioSoundPcmStatus>(),
Expand All @@ -514,9 +523,9 @@ impl VhostUserSoundThread {
for b in std::mem::take(&mut buffers) {
streams[stream_id as usize].buffers.push_back(b);
}
state = TxState::Done;
state = IoState::Done;
}
TxState::Ready
IoState::Ready
if descriptor.len() as usize != size_of::<VirtioSoundPcmXfer>() =>
{
return Err(Error::UnexpectedDescriptorSize(
Expand All @@ -525,17 +534,17 @@ impl VhostUserSoundThread {
)
.into());
}
TxState::Ready => {
IoState::Ready => {
let xfer = desc_chain
.memory()
.read_obj::<VirtioSoundPcmXfer>(descriptor.addr())
.map_err(|_| Error::DescriptorReadFailed)?;
let stream_id: u32 = xfer.stream_id.into();
stream_ids.insert(stream_id);

state = TxState::WaitingBufferForStreamId(stream_id);
state = IoState::WaitingBufferForStreamId(stream_id);
}
TxState::WaitingBufferForStreamId(stream_id)
IoState::WaitingBufferForStreamId(stream_id)
if descriptor.len() as usize == size_of::<VirtioSoundPcmXfer>() =>
{
return Err(Error::UnexpectedDescriptorSize(
Expand All @@ -548,16 +557,16 @@ impl VhostUserSoundThread {
)
.into());
}
TxState::WaitingBufferForStreamId(_stream_id) => {
/*
Rather than copying the content of a descriptor, buffer keeps a pointer to it.
When we copy just after the request is enqueued, the guest's userspace may or
may not have updated the buffer contents. Guest driver simply moves buffers
from the used ring to the available ring without knowing whether the content
has been updated. The device only reads the buffer from guest memory when the
audio engine requires it, which is about after a period thus ensuring that the
buffer is up-to-date.
*/
IoState::WaitingBufferForStreamId(_stream_id) => {
// In the case of TX/Playback:
//
// Rather than copying the content of a descriptor, buffer keeps a pointer
// to it. When we copy just after the request is enqueued, the guest's
// userspace may or may not have updated the buffer contents. Guest driver
// simply moves buffers from the used ring to the available ring without
// knowing whether the content has been updated. The device only reads the
// buffer from guest memory when the audio engine requires it, which is
// about after a period thus ensuring that the buffer is up-to-date.
buffers.push(Buffer::new(*descriptor, Arc::clone(&message)));
}
}
Expand Down Expand Up @@ -626,7 +635,7 @@ impl VhostUserSoundBackend {
},
];
let chmaps: Arc<RwLock<Vec<VirtioSoundChmapInfo>>> = Arc::new(RwLock::new(chmaps_info));
log::trace!("VhostUserSoundBackend::new config {:?}", &config);
log::trace!("VhostUserSoundBackend::new(config = {:?})", &config);
let threads = if config.multi_thread {
vec![
RwLock::new(VhostUserSoundThread::new(
Expand Down Expand Up @@ -691,7 +700,10 @@ impl VhostUserBackend<VringRwLock, ()> for VhostUserSoundBackend {
}

fn max_queue_size(&self) -> usize {
// TODO: Investigate if an alternative value makes any difference.
// The linux kernel driver does no checks for queue length and fails silently if
// a queue is filled up. In this case, adding an element to the queue
// returns ENOSPC and the element is not queued for a later attempt and
// is lost. `64` is a "good enough" value from our observations.
64
}

Expand Down
21 changes: 10 additions & 11 deletions staging/vhost-device-sound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,46 +250,45 @@ pub struct IOMessage {
status: std::sync::atomic::AtomicU32,
pub latency_bytes: std::sync::atomic::AtomicU32,
desc_chain: SoundDescriptorChain,
descriptor: virtio_queue::Descriptor,
response_descriptor: virtio_queue::Descriptor,
vring: VringRwLock,
}

impl Drop for IOMessage {
fn drop(&mut self) {
log::trace!("dropping IOMessage");
let resp = VirtioSoundPcmStatus {
status: self.status.load(std::sync::atomic::Ordering::SeqCst).into(),
latency_bytes: self
.latency_bytes
.load(std::sync::atomic::Ordering::SeqCst)
.into(),
};
log::trace!("dropping IOMessage {:?}", resp);

if let Err(err) = self
.desc_chain
.memory()
.write_obj(resp, self.descriptor.addr())
.write_obj(resp, self.response_descriptor.addr())
{
log::error!("Error::DescriptorWriteFailed: {}", err);
return;
}
if self
if let Err(err) = self
.vring
.add_used(self.desc_chain.head_index(), resp.as_slice().len() as u32)
.is_err()
{
log::error!("Couldn't add used");
log::error!("Couldn't add used bytes count to vring: {}", err);
}
if self.vring.signal_used_queue().is_err() {
log::error!("Couldn't signal used queue");
if let Err(err) = self.vring.signal_used_queue() {
log::error!("Couldn't signal used queue: {}", err);
}
}
}

/// This is the public API through which an external program starts the
/// vhost-device-sound backend server.
pub fn start_backend_server(config: SoundConfig) {
log::trace!("Using config {:?}", &config);
log::trace!("Using config {:?}.", &config);
let listener = Listener::new(config.get_socket_path(), true).unwrap();
let backend = Arc::new(VhostUserSoundBackend::new(config).unwrap());

Expand All @@ -300,12 +299,12 @@ pub fn start_backend_server(config: SoundConfig) {
)
.unwrap();

log::trace!("Starting daemon");
log::trace!("Starting daemon.");
daemon.start(listener).unwrap();

match daemon.wait() {
Ok(()) => {
info!("Stopping cleanly");
info!("Stopping cleanly.");
}
Err(vhost_user_backend::Error::HandleRequest(vhost_user::Error::PartialMessage)) => {
info!(
Expand Down

0 comments on commit 30d5cf4

Please sign in to comment.