egui-test/src/sync.rs

130 lines
3.6 KiB
Rust

//! Synchronozation mechanism between gui thread and async runtime
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use crossbeam_channel::Sender;
use matrix_sdk::{ruma::RoomId, Client};
use tokio::sync::{mpsc::UnboundedReceiver, Notify};
use url::Url;
static QUIT: Notify = Notify::const_new();
/// Request to perform an action or retrieve data.
pub enum Request {
/// Perform login
Login(Url, String, String),
/// Restore session
Restore(matrix_sdk::Session),
/// Get the calculated name for a room
RoomName(RoomId),
/// Stop syncing
Quit,
}
/// Response to a request.
pub enum Response {
/// Response from the synchronization loop
Sync(matrix_sdk::deserialized_responses::SyncResponse),
/// Calculated the name for a room
RoomName(RoomId, String),
/// An error happened while responding to a request
Error(matrix_sdk::Error),
}
/// Run the synchronization loop
#[allow(unused)]
pub fn run(client: Client, mut request: UnboundedReceiver<Request>, response: Sender<Response>) {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async move {
let quit = Arc::new(AtomicBool::new(false));
let sync_handle = {
let client = client.clone();
let response = response.clone();
let quit = quit.clone();
tokio::spawn(async move {
client
.sync_with_callback(Default::default(), |sync| async {
response.send(Response::Sync(sync)).ok();
match quit.load(Ordering::Acquire) {
false => matrix_sdk::LoopCtrl::Continue,
true => dbg!(matrix_sdk::LoopCtrl::Break),
}
})
.await
})
};
loop {
let request = tokio::select! {
request = request.recv() => match request {
Some(request) => request,
None => break,
},
notif = QUIT.notified() => break,
};
tokio::spawn(handle_request(
request,
client.clone(),
response.clone(),
quit.clone(),
));
}
sync_handle.await;
});
}
async fn handle_request(
request: Request,
client: Client,
response: Sender<Response>,
quit: Arc<AtomicBool>,
) -> Result<(), SyncError> {
match request {
Request::Login(_, _, _) => todo!(),
Request::Restore(session) => client.restore_login(session).await?,
Request::RoomName(room_id) => match client.get_room(&room_id) {
Some(room) => response.send(Response::RoomName(room_id, room.display_name().await?))?,
None => (),
},
Request::Quit => {
dbg!(quit.store(true, Ordering::SeqCst));
QUIT.notify_one();
}
};
Ok(())
}
#[derive(Debug)]
enum SyncError {
Sdk(matrix_sdk::Error),
Send,
}
impl std::fmt::Display for SyncError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sync error")
}
}
impl std::error::Error for SyncError {}
impl From<matrix_sdk::Error> for SyncError {
fn from(e: matrix_sdk::Error) -> Self {
Self::Sdk(e)
}
}
impl<T> From<crossbeam_channel::SendError<T>> for SyncError {
fn from(_: crossbeam_channel::SendError<T>) -> Self {
Self::Send
}
}
impl From<matrix_sdk::StoreError> for SyncError {
fn from(e: matrix_sdk::StoreError) -> Self {
Self::Sdk(e.into())
}
}