mod ordered_acker;
mod plenary_ack;
use std::collections::HashMap;
use std::string::FromUtf8Error;
use std::sync::{Arc, Mutex};
use thiserror::Error;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use crate::control_packet::{Publish, QoS};
use crate::error::AckError;
use crate::interface::{CompletionToken, MqttAck};
use crate::session::receiver::{
ordered_acker::{OrderedAcker, PkidAckQueue, PkidError},
plenary_ack::{PlenaryAck, PlenaryAckMember},
};
use crate::topic::{TopicFilter, TopicName, TopicParseError};
#[derive(Debug)]
pub struct AckToken(PlenaryAckMember);
impl AckToken {
pub async fn ack(self) -> Result<CompletionToken, AckError> {
self.0.ack().await
}
}
pub type PublishTx = UnboundedSender<(Publish, Option<AckToken>)>;
pub type PublishRx = UnboundedReceiver<(Publish, Option<AckToken>)>;
#[derive(Error, Debug)]
pub enum DispatchError {
#[error("could not get topic from publish: {0}")]
InvalidPublishTopic(#[from] InvalidPublish),
#[error("packet ID for publish invalid: {0}")]
InvalidPublishPkid(#[from] PkidError),
}
#[derive(Error, Debug)]
pub enum InvalidPublish {
#[error("invalid UTF-8")]
TopicNameUtf8(#[from] FromUtf8Error),
#[error("invalid topic: {0}")]
TopicNameFormat(#[from] TopicParseError),
}
#[derive(Default)]
pub struct PublishReceiverManager {
filtered_txs: HashMap<TopicFilter, Vec<PublishTx>>,
unfiltered_txs: Vec<PublishTx>,
}
impl PublishReceiverManager {
pub fn create_filtered_receiver(&mut self, topic_filter: &TopicFilter) -> PublishRx {
self.prune_filtered_txs();
let (tx, rx) = unbounded_channel();
match self.filtered_txs.get_mut(topic_filter) {
Some(v) => {
v.push(tx);
}
_ => {
self.filtered_txs.insert(topic_filter.clone(), vec![tx]);
}
}
rx
}
pub fn create_unfiltered_receiver(&mut self) -> PublishRx {
let (tx, rx) = unbounded_channel();
self.unfiltered_txs.push(tx);
rx
}
fn prune_filtered_txs(&mut self) {
self.filtered_txs.retain(|_, v| {
v.retain(|tx| !tx.is_closed());
!v.is_empty()
});
}
}
pub struct IncomingPublishDispatcher<A>
where
A: MqttAck + Clone + Send + Sync + 'static,
{
acker: OrderedAcker<A>,
pkid_ack_queue: Arc<Mutex<PkidAckQueue>>,
receiver_manager: Arc<Mutex<PublishReceiverManager>>,
}
impl<A> IncomingPublishDispatcher<A>
where
A: MqttAck + Clone + Send + Sync + 'static,
{
pub fn new(acker: A) -> Self {
let pkid_ack_queue = Arc::new(Mutex::new(PkidAckQueue::default()));
let acker = OrderedAcker::new(acker, pkid_ack_queue.clone());
Self {
acker,
pkid_ack_queue,
receiver_manager: Arc::new(Mutex::new(PublishReceiverManager::default())),
}
}
pub fn get_receiver_manager(&self) -> Arc<Mutex<PublishReceiverManager>> {
self.receiver_manager.clone()
}
pub fn dispatch_publish(&mut self, publish: &Publish) -> Result<usize, DispatchError> {
let topic_name = extract_publish_topic_name(publish)?;
if publish.dup && self.pkid_ack_queue.lock().unwrap().contains(publish.pkid) {
log::debug!(
"Duplicate PUB received for PUB already owned (PKID {}). Discarding",
publish.pkid
);
return Ok(0);
}
let plenary_ack = {
if publish.qos == QoS::AtMostOnce {
None
} else {
self.pkid_ack_queue.lock().unwrap().insert(publish.pkid)?;
let ack_f = {
let acker = self.acker.clone();
let publish = publish.clone();
async move {
let result = acker.ordered_ack(&publish).await;
if result.is_ok() {
log::debug!("Sent ACK for PKID {}", publish.pkid);
} else {
log::error!("ACK failed for PKID {}", publish.pkid);
}
result
}
};
Some(PlenaryAck::new(ack_f))
}
};
let mut num_dispatches = 0;
num_dispatches += self.dispatch_filtered(&topic_name, publish, plenary_ack.as_ref());
if num_dispatches == 0 {
num_dispatches += self.dispatch_unfiltered(publish, plenary_ack.as_ref());
}
log::debug!(
"Dispatched PUB with PKID {} to {num_dispatches} receivers.",
publish.pkid
);
if let Some(plenary_ack) = plenary_ack {
plenary_ack.commence();
}
Ok(num_dispatches)
}
fn dispatch_filtered(
&mut self,
topic_name: &TopicName,
publish: &Publish,
plenary_ack: Option<&PlenaryAck>,
) -> usize {
let mut num_dispatches = 0;
let mut closed = vec![]; let mut receiver_manager = self.receiver_manager.lock().unwrap();
let filtered = receiver_manager
.filtered_txs
.iter()
.filter(|(topic_filter, _)| topic_filter.matches_topic_name(topic_name));
for (topic_filter, v) in filtered {
for (pos, tx) in v.iter().enumerate() {
match tx.send((publish.clone(), create_ack_token(plenary_ack))) {
Ok(()) => num_dispatches += 1,
Err(_) => closed.push((topic_filter.clone(), pos)),
}
}
}
for (topic_filter, pos) in closed.iter().rev() {
if let Some(v) = receiver_manager.filtered_txs.get_mut(topic_filter) {
v.remove(*pos);
if v.is_empty() {
receiver_manager.filtered_txs.remove(topic_filter);
}
}
}
num_dispatches
}
fn dispatch_unfiltered(
&mut self,
publish: &Publish,
plenary_ack: Option<&PlenaryAck>,
) -> usize {
let mut num_dispatches = 0;
let mut closed = vec![];
let mut receiver_manager = self.receiver_manager.lock().unwrap();
for (pos, tx) in receiver_manager.unfiltered_txs.iter().enumerate() {
match tx.send((publish.clone(), create_ack_token(plenary_ack))) {
Ok(()) => num_dispatches += 1,
Err(_) => closed.push(pos),
}
}
for pos in closed.iter().rev() {
receiver_manager.unfiltered_txs.remove(*pos);
}
num_dispatches
}
}
fn extract_publish_topic_name(publish: &Publish) -> Result<TopicName, InvalidPublish> {
Ok(TopicName::from_string(String::from_utf8(
publish.topic.to_vec(),
)?)?)
}
fn create_ack_token(plenary_ack: Option<&PlenaryAck>) -> Option<AckToken> {
plenary_ack.map(|plenary_ack| AckToken(plenary_ack.create_member()))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::control_packet::QoS;
use crate::interface_mocks::{MockClient, MockClientCall};
use std::str::FromStr;
use std::time::Duration;
use test_case::test_case;
use tokio::sync::mpsc::error::TryRecvError;
fn create_publish(topic_name: &TopicName, payload: &str, pkid: u16) -> Publish {
let mut publish = Publish::new(
topic_name.as_str(),
QoS::AtLeastOnce,
payload.to_string(),
None,
);
publish.pkid = pkid;
publish
}
fn create_publish_qos(topic_name: &TopicName, payload: &str, pkid: u16, qos: QoS) -> Publish {
let mut publish = Publish::new(topic_name.as_str(), qos, payload.to_string(), None);
if qos != QoS::AtMostOnce {
publish.pkid = pkid;
}
publish
}
fn assert_expected_recv_value(
recv_value: &(Publish, Option<AckToken>),
expected_publish: &Publish,
) {
let (publish, ack_token) = recv_value;
assert_eq!(publish, expected_publish);
if publish.qos == QoS::AtMostOnce {
assert!(ack_token.is_none());
} else {
assert!(ack_token.is_some());
}
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_no_receivers(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 0);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_one_unfiltered_receiver(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
assert_expected_recv_value(&unfiltered_rx.try_recv().unwrap(), &publish);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_multiple_unfiltered_receivers(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let mut unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let mut unfiltered_rx3 = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "payload", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 3);
assert_expected_recv_value(&unfiltered_rx1.try_recv().unwrap(), &publish);
assert_expected_recv_value(&unfiltered_rx2.try_recv().unwrap(), &publish);
assert_expected_recv_value(&unfiltered_rx3.try_recv().unwrap(), &publish);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_matching_filtered_receiver_supercedes_unfiltered_receiver(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let publish1 = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish1).unwrap(), 1);
assert_expected_recv_value(&unfiltered_rx.try_recv().unwrap(), &publish1);
let topic_filter1 = TopicFilter::from_str("finance/bonds/banker1").unwrap();
assert!(!topic_filter1.matches_topic_name(&topic_name));
let mut filtered_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter1);
let publish2 = create_publish_qos(&topic_name, "publish 2", 2, qos);
assert_eq!(dispatcher.dispatch_publish(&publish2).unwrap(), 1);
assert_expected_recv_value(&unfiltered_rx.try_recv().unwrap(), &publish2);
assert_eq!(filtered_rx1.try_recv().unwrap_err(), TryRecvError::Empty);
let topic_filter2 = TopicFilter::from_str("sport/tennis/player1").unwrap();
assert!(topic_filter2.matches_topic_name(&topic_name));
let mut filtered_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter2);
let publish3 = create_publish_qos(&topic_name, "publish 3", 3, qos);
assert_eq!(dispatcher.dispatch_publish(&publish3).unwrap(), 1);
assert_eq!(unfiltered_rx.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(filtered_rx1.try_recv().unwrap_err(), TryRecvError::Empty);
assert_expected_recv_value(&filtered_rx2.try_recv().unwrap(), &publish3);
drop(filtered_rx2);
let publish4 = create_publish_qos(&topic_name, "publish 4", 4, qos);
assert_eq!(dispatcher.dispatch_publish(&publish4).unwrap(), 1);
assert_expected_recv_value(&unfiltered_rx.try_recv().unwrap(), &publish4);
assert_eq!(filtered_rx1.try_recv().unwrap_err(), TryRecvError::Empty);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_no_matching_filtered_receivers(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let topic_filter = TopicFilter::from_str("finance/bonds/banker1").unwrap();
assert!(!topic_filter.matches_topic_name(&topic_name));
let mut filtered_rx = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 0);
assert_eq!(filtered_rx.try_recv().unwrap_err(), TryRecvError::Empty);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_one_matching_filter_exact(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let topic_filter = TopicFilter::from_str("sport/tennis/player1").unwrap();
assert!(topic_filter.matches_topic_name(&topic_name));
let mut filtered_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
let topic_filter2 = TopicFilter::from_str("sport/tennis/player2").unwrap();
assert!(!topic_filter2.matches_topic_name(&topic_name));
let mut filtered_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter2);
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
assert_expected_recv_value(&filtered_rx1.try_recv().unwrap(), &publish);
assert_eq!(filtered_rx2.try_recv().unwrap_err(), TryRecvError::Empty);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_one_matching_filter_wildcard(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let topic_filter = TopicFilter::from_str("sport/+/player1").unwrap();
assert!(topic_filter.matches_topic_name(&topic_name));
let mut filtered_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
let topic_filter2 = TopicFilter::from_str("finance/#").unwrap();
assert!(!topic_filter2.matches_topic_name(&topic_name));
let mut filtered_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter2);
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
assert_expected_recv_value(&filtered_rx1.try_recv().unwrap(), &publish);
assert_eq!(filtered_rx2.try_recv().unwrap_err(), TryRecvError::Empty);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_multiple_matching_filters_overlapping(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let topic_filter1 = TopicFilter::from_str("sport/+/player1").unwrap();
assert!(topic_filter1.matches_topic_name(&topic_name));
let mut filtered_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter1);
let topic_filter2 = TopicFilter::from_str("sport/tennis/#").unwrap();
assert!(topic_filter2.matches_topic_name(&topic_name));
let mut filtered_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter2);
let topic_filter3 = TopicFilter::from_str("finance/#").unwrap();
assert!(!topic_filter3.matches_topic_name(&topic_name));
let mut filtered_rx3 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter3);
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 2);
assert_expected_recv_value(&filtered_rx1.try_recv().unwrap(), &publish);
assert_expected_recv_value(&filtered_rx2.try_recv().unwrap(), &publish);
assert_eq!(filtered_rx3.try_recv().unwrap_err(), TryRecvError::Empty);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_multiple_matching_filters_duplicate(qos: QoS) {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let topic_filter1 = TopicFilter::from_str("sport/tennis/player1").unwrap(); let topic_filter2 = TopicFilter::from_str("sport/tennis/player1").unwrap(); let topic_filter3 = TopicFilter::from_str("sport/+/player1").unwrap(); let topic_filter4 = TopicFilter::from_str("sport/+/player1").unwrap(); assert!(topic_name.matches_topic_filter(&topic_filter1));
assert!(topic_name.matches_topic_filter(&topic_filter2));
assert!(topic_name.matches_topic_filter(&topic_filter3));
assert!(topic_name.matches_topic_filter(&topic_filter4));
let topic_filter5 = TopicFilter::from_str("finance/bonds/banker1").unwrap();
let topic_filter6 = TopicFilter::from_str("finance/bonds/banker1").unwrap();
let topic_filter7 = TopicFilter::from_str("sport/hockey/+").unwrap();
let topic_filter8 = TopicFilter::from_str("sport/hockey/+").unwrap();
assert!(!topic_name.matches_topic_filter(&topic_filter5));
assert!(!topic_name.matches_topic_filter(&topic_filter6));
assert!(!topic_name.matches_topic_filter(&topic_filter7));
assert!(!topic_name.matches_topic_filter(&topic_filter8));
let mut filtered_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter1);
let mut filtered_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter2);
let mut filtered_rx3 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter3);
let mut filtered_rx4 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter4);
let mut filtered_rx5 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter5);
let mut filtered_rx6 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter6);
let mut filtered_rx7 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter7);
let mut filtered_rx8 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter8);
let publish = create_publish_qos(&topic_name, "payload 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 4);
assert_expected_recv_value(&filtered_rx1.try_recv().unwrap(), &publish);
assert_expected_recv_value(&filtered_rx2.try_recv().unwrap(), &publish);
assert_expected_recv_value(&filtered_rx3.try_recv().unwrap(), &publish);
assert_expected_recv_value(&filtered_rx4.try_recv().unwrap(), &publish);
assert_eq!(filtered_rx5.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(filtered_rx6.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(filtered_rx7.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(filtered_rx8.try_recv().unwrap_err(), TryRecvError::Empty);
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_duplicate_pkid_error(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish1 = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish1).unwrap(), 1);
let (r_publish1, ack_token1) = unfiltered_rx.try_recv().unwrap();
assert_eq!(r_publish1, publish1);
let publish2 = create_publish_qos(&topic_name, "publish 2", 1, qos);
assert!(matches!(
dispatcher.dispatch_publish(&publish2).unwrap_err(),
DispatchError::InvalidPublishPkid(_)
));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
ack_token1.unwrap().ack().await.unwrap();
assert_eq!(mock_controller.ack_count(), 1);
assert_eq!(dispatcher.dispatch_publish(&publish2).unwrap(), 1);
let (r_publish2, ack_token2) = unfiltered_rx.try_recv().unwrap();
assert_eq!(r_publish2, publish2);
ack_token2.unwrap().ack().await.unwrap();
assert_eq!(mock_controller.ack_count(), 2);
}
#[test_case(QoS::AtMostOnce; "QoS 0")]
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_invalid_topic_name_error(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let invalid_topic_name = "";
assert!(!TopicName::is_valid_topic_name(invalid_topic_name));
let publish = Publish::new(invalid_topic_name, qos, "some payload", None);
assert!(matches!(
dispatcher.dispatch_publish(&publish).unwrap_err(),
DispatchError::InvalidPublishTopic(_)
));
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
drop(unfiltered_rx);
}
#[tokio::test]
async fn create_and_drop_receivers() {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let topic_filter = TopicFilter::from_str("sport/tennis/player1").unwrap();
assert!(topic_filter.matches_topic_name(&topic_name));
let publish1 = create_publish(&topic_name, "publish 1", 1);
assert_eq!(dispatcher.dispatch_publish(&publish1).unwrap(), 0);
let mut unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let publish2 = create_publish(&topic_name, "publish 2", 2);
assert_eq!(dispatcher.dispatch_publish(&publish2).unwrap(), 1);
assert_expected_recv_value(&unfiltered_rx1.try_recv().unwrap(), &publish2);
let mut unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let publish3 = create_publish(&topic_name, "publish 3", 3);
assert_eq!(dispatcher.dispatch_publish(&publish3).unwrap(), 2);
assert_expected_recv_value(&unfiltered_rx1.try_recv().unwrap(), &publish3);
assert_expected_recv_value(&unfiltered_rx2.try_recv().unwrap(), &publish3);
let mut filtered_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
let publish4 = create_publish(&topic_name, "publish 4", 4);
assert_eq!(dispatcher.dispatch_publish(&publish4).unwrap(), 1);
assert_expected_recv_value(&filtered_rx1.try_recv().unwrap(), &publish4);
assert_eq!(unfiltered_rx1.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(unfiltered_rx2.try_recv().unwrap_err(), TryRecvError::Empty);
let mut filtered_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
let publish5 = create_publish(&topic_name, "publish 5", 5);
assert_eq!(dispatcher.dispatch_publish(&publish5).unwrap(), 2);
assert_expected_recv_value(&filtered_rx1.try_recv().unwrap(), &publish5);
assert_expected_recv_value(&filtered_rx2.try_recv().unwrap(), &publish5);
assert_eq!(unfiltered_rx1.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(unfiltered_rx2.try_recv().unwrap_err(), TryRecvError::Empty);
drop(filtered_rx2);
let publish6 = create_publish(&topic_name, "publish 6", 6);
assert_eq!(dispatcher.dispatch_publish(&publish6).unwrap(), 1);
assert_expected_recv_value(&filtered_rx1.try_recv().unwrap(), &publish6);
assert_eq!(unfiltered_rx1.try_recv().unwrap_err(), TryRecvError::Empty);
assert_eq!(unfiltered_rx2.try_recv().unwrap_err(), TryRecvError::Empty);
drop(filtered_rx1);
let publish7 = create_publish(&topic_name, "publish 7", 7);
assert_eq!(dispatcher.dispatch_publish(&publish7).unwrap(), 2);
assert_expected_recv_value(&unfiltered_rx1.try_recv().unwrap(), &publish7);
assert_expected_recv_value(&unfiltered_rx2.try_recv().unwrap(), &publish7);
drop(unfiltered_rx2);
let publish8 = create_publish(&topic_name, "publish 8", 8);
assert_eq!(dispatcher.dispatch_publish(&publish8).unwrap(), 1);
assert_expected_recv_value(&unfiltered_rx1.try_recv().unwrap(), &publish8);
drop(unfiltered_rx1);
let publish9 = create_publish(&topic_name, "publish 9", 9);
assert_eq!(dispatcher.dispatch_publish(&publish9).unwrap(), 0);
}
#[tokio::test]
async fn full_filtered_receiver_cleanup_on_create() {
let client = MockClient::new();
let dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_filter1 = TopicFilter::from_str("sport/tennis/player1").unwrap(); let topic_filter2 = topic_filter1.clone(); let topic_filter3 = topic_filter1.clone(); let topic_filter4 = TopicFilter::from_str("sport/+/player1").unwrap(); let topic_filter5 = topic_filter4.clone(); let topic_filter6 = TopicFilter::from_str("sport/#").unwrap(); let filter_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter1); let filter_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter2); let filter_rx3 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter3); let filter_rx4 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter4); let filter_rx5 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter5); let filter_rx6 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter6); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 3);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter1)
.unwrap()
.len(),
3
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.len(),
2
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter6)
.unwrap()
.len(),
1
); drop(filter_rx3); drop(filter_rx5); drop(filter_rx6); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 3);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter1)
.unwrap()
.len(),
3
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.len(),
2
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter6)
.unwrap()
.len(),
1
); let topic_filter7 = TopicFilter::from_str("finance/bonds/banker1").unwrap(); let filter_rx7 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter7); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 3);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter1)
.unwrap()
.len(),
2
); assert!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter1)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.len(),
1
); assert!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); assert!(
!manager
.lock()
.unwrap()
.filtered_txs
.contains_key(&topic_filter6)
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter7)
.unwrap()
.len(),
1
); assert!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter7)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); drop(filter_rx1); drop(filter_rx2); drop(filter_rx4); drop(filter_rx7); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 3);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter1)
.unwrap()
.len(),
2
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.len(),
1
); assert!(
!manager
.lock()
.unwrap()
.filtered_txs
.contains_key(&topic_filter6)
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter7)
.unwrap()
.len(),
1
); let topic_filter8 = topic_filter7.clone(); let filter_rx8 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter8); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 1);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter8)
.unwrap()
.len(),
1
); assert!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter8)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); drop(filter_rx8);
}
#[tokio::test]
async fn no_unfiltered_receiver_cleanup_on_create() {
let client = MockClient::new();
let dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let unfiltered_rx3 = manager.lock().unwrap().create_unfiltered_receiver();
assert_eq!(manager.lock().unwrap().unfiltered_txs.len(), 3);
drop(unfiltered_rx2);
assert_eq!(manager.lock().unwrap().unfiltered_txs.len(), 3);
let unfiltered_rx4 = manager.lock().unwrap().create_unfiltered_receiver();
assert_eq!(manager.lock().unwrap().unfiltered_txs.len(), 4);
drop(unfiltered_rx1);
drop(unfiltered_rx3);
drop(unfiltered_rx4);
}
#[tokio::test]
async fn lazy_filtered_receiver_cleanup_on_dispatch() {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_filter1 = TopicFilter::from_str("sport/#").unwrap(); let topic_filter2 = topic_filter1.clone(); let topic_filter3 = topic_filter1.clone(); let topic_filter4 = TopicFilter::from_str("sport/+/player1").unwrap(); let topic_filter5 = topic_filter4.clone(); let topic_filter6 = TopicFilter::from_str("sport/tennis/player1").unwrap(); let filter_rx1 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter1); let filter_rx2 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter2); let filter_rx3 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter3); let filter_rx4 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter4); let filter_rx5 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter5); let filter_rx6 = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter6); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 3);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter3)
.unwrap()
.len(),
3
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter5)
.unwrap()
.len(),
2
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter6)
.unwrap()
.len(),
1
); drop(filter_rx3); drop(filter_rx5); drop(filter_rx6); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 3);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter3)
.unwrap()
.len(),
3
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter5)
.unwrap()
.len(),
2
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter6)
.unwrap()
.len(),
1
); let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
assert!(topic_name.matches_topic_filter(&topic_filter3)); assert!(topic_name.matches_topic_filter(&topic_filter5)); assert!(topic_name.matches_topic_filter(&topic_filter6)); let publish = create_publish(&topic_name, "payload 1", 1);
dispatcher.dispatch_publish(&publish).unwrap();
assert_eq!(manager.lock().unwrap().filtered_txs.len(), 2);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter3)
.unwrap()
.len(),
2
); assert!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter3)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter5)
.unwrap()
.len(),
1
); assert!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter5)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); assert!(
!manager
.lock()
.unwrap()
.filtered_txs
.contains_key(&topic_filter6)
); drop(filter_rx2); drop(filter_rx4); assert_eq!(manager.lock().unwrap().filtered_txs.len(), 2);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter2)
.unwrap()
.len(),
2
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.len(),
1
); assert!(
!manager
.lock()
.unwrap()
.filtered_txs
.contains_key(&topic_filter6)
); let topic_name = TopicName::from_str("sport/tennis/player2").unwrap();
assert!(topic_name.matches_topic_filter(&topic_filter2)); assert!(!topic_name.matches_topic_filter(&topic_filter4)); let publish = create_publish(&topic_name, "payload 2", 2);
dispatcher.dispatch_publish(&publish).unwrap();
assert_eq!(manager.lock().unwrap().filtered_txs.len(), 2);
assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter2)
.unwrap()
.len(),
1
); assert!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter2)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); assert_eq!(
manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.len(),
1
); assert!(
!manager
.lock()
.unwrap()
.filtered_txs
.get(&topic_filter4)
.unwrap()
.iter()
.all(|tx| !tx.is_closed())
); drop(filter_rx1);
}
#[tokio::test]
async fn lazy_unfiltered_receiver_cleanup_on_dispatch() {
let client = MockClient::new();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let unfiltered_rx3 = manager.lock().unwrap().create_unfiltered_receiver();
assert_eq!(manager.lock().unwrap().unfiltered_txs.len(), 3);
drop(unfiltered_rx2);
assert_eq!(manager.lock().unwrap().unfiltered_txs.len(), 3);
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish(&topic_name, "payload 1", 1);
dispatcher.dispatch_publish(&publish).unwrap();
assert_eq!(manager.lock().unwrap().unfiltered_txs.len(), 2);
drop(unfiltered_rx1);
drop(unfiltered_rx3);
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatch_auto_ack_when_zero_dispatches(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 0);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
mock_controller.reset_mock();
assert_eq!(mock_controller.ack_count(), 0);
let topic_filter = TopicFilter::from_str("finance/bonds/banker1").unwrap();
assert!(!topic_filter.matches_topic_name(&topic_name));
let filtered_rx = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 0);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
mock_controller.reset_mock();
assert_eq!(mock_controller.ack_count(), 0);
drop(filtered_rx);
let topic_filter = TopicFilter::from_str("sport/tennis/player1").unwrap();
assert!(topic_filter.matches_topic_name(&topic_name));
let filtered_rx = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
drop(filtered_rx);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 0);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
mock_controller.reset_mock();
assert_eq!(mock_controller.ack_count(), 0);
let unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
drop(unfiltered_rx);
}
#[tokio::test]
async fn dispatch_never_ack_on_qos0() {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 0, QoS::AtMostOnce);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 0);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
let topic_filter = TopicFilter::from_str("finance/bonds/banker1").unwrap();
assert!(!topic_filter.matches_topic_name(&topic_name));
let _filtered_rx = manager
.lock()
.unwrap()
.create_filtered_receiver(&topic_filter);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 0);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
let (r_publish, ack_token) = unfiltered_rx.try_recv().unwrap();
assert_eq!(r_publish, publish);
assert!(ack_token.is_none());
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
drop(unfiltered_rx);
drop(manager);
drop(dispatcher);
drop(publish);
drop(r_publish);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn ack_token_ordered_ack_single_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
let (r_publish, ack_token) = unfiltered_rx.try_recv().unwrap();
assert_eq!(mock_controller.ack_count(), 0);
ack_token.unwrap().ack().await.unwrap();
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, r_publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn ack_token_ordered_ack_multi_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let mut unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "payload", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 2);
let (r_publish1, ack_token1) = unfiltered_rx1.try_recv().unwrap();
let (r_publish2, ack_token2) = unfiltered_rx2.try_recv().unwrap();
assert_eq!(mock_controller.ack_count(), 0);
let jh1 = tokio::task::spawn(ack_token1.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
assert!(!jh1.is_finished());
let jh2 = tokio::task::spawn(ack_token2.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
assert_eq!(r_publish1, publish);
assert_eq!(r_publish2, publish);
assert!(jh1.is_finished());
assert!(jh2.is_finished());
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn ack_token_unordered_ack_single_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish1 = create_publish_qos(&topic_name, "payload 1", 1, qos);
let publish2 = create_publish_qos(&topic_name, "payload 2", 2, qos);
let publish3 = create_publish_qos(&topic_name, "payload 3", 3, qos);
let publish4 = create_publish_qos(&topic_name, "payload 4", 4, qos);
assert_eq!(dispatcher.dispatch_publish(&publish1).unwrap(), 1);
assert_eq!(dispatcher.dispatch_publish(&publish2).unwrap(), 1);
assert_eq!(dispatcher.dispatch_publish(&publish3).unwrap(), 1);
assert_eq!(dispatcher.dispatch_publish(&publish4).unwrap(), 1);
let (r_publish1, ack_token1) = unfiltered_rx.try_recv().unwrap();
let (r_publish2, ack_token2) = unfiltered_rx.try_recv().unwrap();
let (r_publish3, ack_token3) = unfiltered_rx.try_recv().unwrap();
let (r_publish4, ack_token4) = unfiltered_rx.try_recv().unwrap();
assert!(mock_controller.ack_count() == 0);
let jh3 = tokio::task::spawn(ack_token3.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 0);
assert!(!jh3.is_finished());
let jh4 = tokio::task::spawn(ack_token4.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 0);
assert!(!jh4.is_finished());
let jh1 = tokio::task::spawn(ack_token1.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 1);
assert!(jh1.is_finished());
let jh2 = tokio::task::spawn(ack_token2.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 4);
assert!(jh2.is_finished());
assert!(jh3.is_finished());
assert!(jh4.is_finished());
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 4);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, r_publish1);
}
_ => panic!("Expected AcknowledgePublish"),
}
match &calls[1] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, r_publish2);
}
_ => panic!("Expected AcknowledgePublish"),
}
match &calls[2] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, r_publish3);
}
_ => panic!("Expected AcknowledgePublish"),
}
match &calls[3] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, r_publish4);
}
_ => panic!("Expected AcknowledgePublish"),
}
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn ack_token_unordered_ack_multi_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let mut unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish1 = create_publish_qos(&topic_name, "payload 1", 1, qos);
let publish2 = create_publish_qos(&topic_name, "payload 2", 2, qos);
let publish3 = create_publish_qos(&topic_name, "payload 3", 3, qos);
let publish4 = create_publish_qos(&topic_name, "payload 4", 4, qos);
assert_eq!(dispatcher.dispatch_publish(&publish1).unwrap(), 2);
assert_eq!(dispatcher.dispatch_publish(&publish2).unwrap(), 2);
assert_eq!(dispatcher.dispatch_publish(&publish3).unwrap(), 2);
assert_eq!(dispatcher.dispatch_publish(&publish4).unwrap(), 2);
let (_r1_publish1, r1_ack_token1) = unfiltered_rx1.try_recv().unwrap();
let (_r1_publish2, r1_ack_token2) = unfiltered_rx1.try_recv().unwrap();
let (_r1_publish3, r1_ack_token3) = unfiltered_rx1.try_recv().unwrap();
let (_r1_publish4, r1_ack_token4) = unfiltered_rx1.try_recv().unwrap();
let (_r2_publish1, r2_ack_token1) = unfiltered_rx2.try_recv().unwrap();
let (_r2_publish2, r2_ack_token2) = unfiltered_rx2.try_recv().unwrap();
let (_r2_publish3, r2_ack_token3) = unfiltered_rx2.try_recv().unwrap();
let (_r2_publish4, r2_ack_token4) = unfiltered_rx2.try_recv().unwrap();
assert!(mock_controller.ack_count() == 0);
let jh3_r1 = tokio::task::spawn(r1_ack_token3.unwrap().ack());
let jh3_r2 = tokio::task::spawn(r2_ack_token3.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 0);
assert!(!jh3_r1.is_finished());
assert!(!jh3_r2.is_finished());
let jh4_r1 = tokio::task::spawn(r1_ack_token4.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 0);
assert!(!jh4_r1.is_finished());
let jh1_r1 = tokio::task::spawn(r1_ack_token1.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 0);
assert!(!jh1_r1.is_finished());
let jh1_r2 = tokio::task::spawn(r2_ack_token1.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 1);
assert!(jh1_r2.is_finished());
assert!(jh1_r1.is_finished());
let jh2_r1 = tokio::task::spawn(r1_ack_token2.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 1);
assert!(!jh2_r1.is_finished());
let jh4_r2 = tokio::task::spawn(r2_ack_token4.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 1);
assert!(!jh4_r2.is_finished());
let jh2_r2 = tokio::task::spawn(r2_ack_token2.unwrap().ack());
tokio::time::sleep(Duration::from_secs(1)).await;
assert!(mock_controller.ack_count() == 4);
assert!(jh2_r2.is_finished());
assert!(jh2_r1.is_finished());
assert!(jh3_r1.is_finished());
assert!(jh3_r2.is_finished());
assert!(jh4_r1.is_finished());
assert!(jh4_r2.is_finished());
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 4);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish1);
}
_ => panic!("Expected AcknowledgePublish"),
}
match &calls[1] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish2);
}
_ => panic!("Expected AcknowledgePublish"),
}
match &calls[2] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish3);
}
_ => panic!("Expected AcknowledgePublish"),
}
match &calls[3] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish4);
}
_ => panic!("Expected AcknowledgePublish"),
}
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn ack_token_drop_before_ack_single_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
let (r_publish, ack_token) = unfiltered_rx.try_recv().unwrap();
assert_eq!(mock_controller.ack_count(), 0);
drop(ack_token);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, r_publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn ack_token_drop_before_ack_multi_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let mut unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 2);
let (_, ack_token1) = unfiltered_rx1.try_recv().unwrap();
let (_, ack_token2) = unfiltered_rx2.try_recv().unwrap();
assert_eq!(mock_controller.ack_count(), 0);
drop(ack_token1);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
ack_token2.unwrap().ack().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn receiver_drop_before_ack_single_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
assert_eq!(mock_controller.ack_count(), 0);
drop(unfiltered_rx);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn receiver_drop_before_ack_multi_receiver(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let unfiltered_rx1 = manager.lock().unwrap().create_unfiltered_receiver();
let mut unfiltered_rx2 = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 2);
assert_eq!(mock_controller.ack_count(), 0);
drop(unfiltered_rx1);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
let (_, ack_token) = unfiltered_rx2.try_recv().unwrap();
ack_token.unwrap().ack().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
}
#[test_case(QoS::AtLeastOnce; "QoS 1")]
#[test_case(QoS::ExactlyOnce; "QoS 2")]
#[tokio::test]
async fn dispatcher_and_manager_drop(qos: QoS) {
let client = MockClient::new();
let mock_controller = client.mock_controller();
let mut dispatcher = IncomingPublishDispatcher::new(client);
let manager = dispatcher.get_receiver_manager();
let mut unfiltered_rx = manager.lock().unwrap().create_unfiltered_receiver();
let topic_name = TopicName::from_str("sport/tennis/player1").unwrap();
let publish = create_publish_qos(&topic_name, "publish 1", 1, qos);
assert_eq!(dispatcher.dispatch_publish(&publish).unwrap(), 1);
drop(dispatcher);
drop(manager);
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(mock_controller.ack_count(), 0);
let (r_publish, ack_token) = unfiltered_rx.try_recv().unwrap();
assert_eq!(r_publish, publish);
ack_token.unwrap().ack().await.unwrap();
assert_eq!(mock_controller.ack_count(), 1);
let calls = mock_controller.call_sequence();
assert_eq!(calls.len(), 1);
match &calls[0] {
MockClientCall::Ack(call) => {
assert_eq!(call.publish, publish);
}
_ => panic!("Expected AcknowledgePublish"),
}
let r_result = unfiltered_rx.recv().await;
assert!(r_result.is_none());
}
}