egui-test/src/sync.rs

209 lines
6.6 KiB
Rust

//! Synchronozation mechanism between gui thread and async runtime
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use crossbeam_channel::Sender;
use matrix_sdk::{
encryption::verification::{
QrVerification, SasVerification, Verification, VerificationRequest,
},
room::Joined,
ruma::{events::room::message::RoomMessageEventContent, RoomId, UserId},
Client, RoomMember,
};
use tokio::sync::{mpsc::UnboundedReceiver, Notify};
use url::Url;
static QUIT: Notify = Notify::const_new();
/// Request to perform an action or retrieve data.
pub enum Request {
/// Perform login
Login(Url, String, String),
/// Restore session
Restore(matrix_sdk::Session),
/// Get the calculated name for a room.
RoomName(RoomId),
/// Send a message to a room.
Message(Joined, String),
/// Get a member from a joined room
JoinedMember(Joined, UserId),
/// Get the verification request with the flow id.
VerifyRequest(UserId, String),
/// Cancel a verification attempt.
VerifyCancel(VerificationRequest),
/// Accept a verification request.
VerifyAccept(VerificationRequest),
/// Find an active verification with the flow id.
VerifyStart(UserId, String),
/// Start SAS verification flow.
VerifyStartSas(VerificationRequest),
VerifySasConfirm(SasVerification),
/// Start QR code verification flow.
VerifyStartQr(VerificationRequest),
/// Stop syncing
Quit,
}
/// Response produced while running the sync loop.
pub enum Response {
/// Response from the synchronization loop
Sync(matrix_sdk::deserialized_responses::SyncResponse),
/// Calculated the name for a room
RoomName(RoomId, String),
/// Retrived a member frmo a joined room.
JoinedMember(RoomId, RoomMember),
/// Got a verification request.
VerifyRequest(VerificationRequest),
/// Started SAS verification.
VerifySas(SasVerification),
/// Started QR verification.
VerifyQr(QrVerification),
/// An error happened while responding to a request
Error(matrix_sdk::Error),
}
/// Run the synchronization loop
#[allow(unused)]
pub fn run(client: Client, mut request: UnboundedReceiver<Request>, response: Sender<Response>) {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async move {
let quit = Arc::new(AtomicBool::new(false));
let sync_handle = {
let client = client.clone();
let response = response.clone();
let quit = quit.clone();
tokio::spawn(async move {
client
.sync_with_callback(Default::default(), |sync| async {
response.send(Response::Sync(sync)).ok();
match quit.load(Ordering::Acquire) {
false => matrix_sdk::LoopCtrl::Continue,
true => dbg!(matrix_sdk::LoopCtrl::Break),
}
})
.await
})
};
loop {
let request = tokio::select! {
request = request.recv() => match request {
Some(request) => request,
None => break,
},
notif = QUIT.notified() => break,
};
tokio::spawn(handle_request(
request,
client.clone(),
response.clone(),
quit.clone(),
));
}
sync_handle.await;
});
}
/// Process an incoming request.
async fn handle_request(
request: Request,
client: Client,
response: Sender<Response>,
quit: Arc<AtomicBool>,
) -> Result<(), SyncError> {
match request {
Request::Login(_, _, _) => todo!(),
Request::Restore(session) => client.restore_login(session).await?,
Request::RoomName(room_id) => match client.get_room(&room_id) {
Some(room) => response.send(Response::RoomName(room_id, room.display_name().await?))?,
None => (),
},
Request::JoinedMember(room, user) => {
if let Some(member) = room.get_member(&user).await? {
response.send(Response::JoinedMember(room.room_id().clone(), member))?;
}
}
Request::Message(room, message) => {
let event = RoomMessageEventContent::text_plain(message);
room.send(event, None).await?;
}
Request::VerifyRequest(user, flow_id) => {
let verification = client.get_verification_request(&user, &flow_id).await;
if let Some(verification) = verification {
response.send(Response::VerifyRequest(verification))?;
}
}
Request::VerifyCancel(verify) => {
verify.cancel().await?;
}
Request::VerifyAccept(verify) => {
verify.accept().await?;
}
Request::VerifyStart(sender, flow_id) => {
if let Some(verify) = client.get_verification(&sender, &flow_id).await {
match verify {
// TODO: auto-accepting is very naughty
Verification::SasV1(sas) => {
sas.accept().await?;
response.send(Response::VerifySas(sas))?
}
Verification::QrV1(qr) => response.send(Response::VerifyQr(qr))?,
};
}
}
Request::VerifyStartSas(verify) => {
if let Some(sas) = verify.start_sas().await? {
response.send(Response::VerifySas(sas))?;
}
}
Request::VerifySasConfirm(sas) => {
sas.confirm().await?;
}
Request::VerifyStartQr(verify) => {
if let Some(qr) = verify.generate_qr_code().await? {
response.send(Response::VerifyQr(qr))?;
}
}
Request::Quit => {
dbg!(quit.store(true, Ordering::SeqCst));
QUIT.notify_one();
}
};
Ok(())
}
#[derive(Debug)]
enum SyncError {
Sdk(matrix_sdk::Error),
Send,
}
impl std::fmt::Display for SyncError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sync error")
}
}
impl std::error::Error for SyncError {}
impl From<matrix_sdk::Error> for SyncError {
fn from(e: matrix_sdk::Error) -> Self {
Self::Sdk(e)
}
}
impl<T> From<crossbeam_channel::SendError<T>> for SyncError {
fn from(_: crossbeam_channel::SendError<T>) -> Self {
Self::Send
}
}
impl From<matrix_sdk::StoreError> for SyncError {
fn from(e: matrix_sdk::StoreError) -> Self {
Self::Sdk(e.into())
}
}