//! Synchronozation mechanism between gui thread and async runtime use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; use crossbeam_channel::Sender; use matrix_sdk::{ encryption::verification::{ QrVerification, SasVerification, Verification, VerificationRequest, }, room::Joined, ruma::{events::room::message::RoomMessageEventContent, RoomId, UserId}, Client, RoomMember, }; use tokio::sync::{mpsc::UnboundedReceiver, Notify}; use url::Url; static QUIT: Notify = Notify::const_new(); /// Request to perform an action or retrieve data. pub enum Request { /// Perform login Login(Url, String, String), /// Restore session Restore(matrix_sdk::Session), /// Get the calculated name for a room. RoomName(RoomId), /// Send a message to a room. Message(Joined, String), /// Get a member from a joined room JoinedMember(Joined, UserId), /// Get the verification request with the flow id. VerifyRequest(UserId, String), /// Cancel a verification attempt. VerifyCancel(VerificationRequest), /// Accept a verification request. VerifyAccept(VerificationRequest), /// Find an active verification with the flow id. VerifyStart(UserId, String), /// Start SAS verification flow. VerifyStartSas(VerificationRequest), VerifySasConfirm(SasVerification), /// Start QR code verification flow. VerifyStartQr(VerificationRequest), /// Stop syncing Quit, } /// Response produced while running the sync loop. pub enum Response { /// Response from the synchronization loop Sync(matrix_sdk::deserialized_responses::SyncResponse), /// Calculated the name for a room RoomName(RoomId, String), /// Retrived a member frmo a joined room. JoinedMember(RoomId, RoomMember), /// Got a verification request. VerifyRequest(VerificationRequest), /// Started SAS verification. VerifySas(SasVerification), /// Started QR verification. VerifyQr(QrVerification), /// An error happened while responding to a request Error(matrix_sdk::Error), } /// Run the synchronization loop #[allow(unused)] pub fn run(client: Client, mut request: UnboundedReceiver, response: Sender) { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async move { let quit = Arc::new(AtomicBool::new(false)); let sync_handle = { let client = client.clone(); let response = response.clone(); let quit = quit.clone(); tokio::spawn(async move { client .sync_with_callback(Default::default(), |sync| async { response.send(Response::Sync(sync)).ok(); match quit.load(Ordering::Acquire) { false => matrix_sdk::LoopCtrl::Continue, true => dbg!(matrix_sdk::LoopCtrl::Break), } }) .await }) }; loop { let request = tokio::select! { request = request.recv() => match request { Some(request) => request, None => break, }, notif = QUIT.notified() => break, }; tokio::spawn(handle_request( request, client.clone(), response.clone(), quit.clone(), )); } sync_handle.await; }); } /// Process an incoming request. async fn handle_request( request: Request, client: Client, response: Sender, quit: Arc, ) -> Result<(), SyncError> { match request { Request::Login(_, _, _) => todo!(), Request::Restore(session) => client.restore_login(session).await?, Request::RoomName(room_id) => match client.get_room(&room_id) { Some(room) => response.send(Response::RoomName(room_id, room.display_name().await?))?, None => (), }, Request::JoinedMember(room, user) => { if let Some(member) = room.get_member(&user).await? { response.send(Response::JoinedMember(room.room_id().clone(), member))?; } } Request::Message(room, message) => { let event = RoomMessageEventContent::text_plain(message); room.send(event, None).await?; } Request::VerifyRequest(user, flow_id) => { let verification = client.get_verification_request(&user, &flow_id).await; if let Some(verification) = verification { response.send(Response::VerifyRequest(verification))?; } } Request::VerifyCancel(verify) => { verify.cancel().await?; } Request::VerifyAccept(verify) => { verify.accept().await?; } Request::VerifyStart(sender, flow_id) => { if let Some(verify) = client.get_verification(&sender, &flow_id).await { match verify { // TODO: auto-accepting is very naughty Verification::SasV1(sas) => { sas.accept().await?; response.send(Response::VerifySas(sas))? } Verification::QrV1(qr) => response.send(Response::VerifyQr(qr))?, }; } } Request::VerifyStartSas(verify) => { if let Some(sas) = verify.start_sas().await? { response.send(Response::VerifySas(sas))?; } } Request::VerifySasConfirm(sas) => { sas.confirm().await?; } Request::VerifyStartQr(verify) => { if let Some(qr) = verify.generate_qr_code().await? { response.send(Response::VerifyQr(qr))?; } } Request::Quit => { dbg!(quit.store(true, Ordering::SeqCst)); QUIT.notify_one(); } }; Ok(()) } #[derive(Debug)] enum SyncError { Sdk(matrix_sdk::Error), Send, } impl std::fmt::Display for SyncError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "sync error") } } impl std::error::Error for SyncError {} impl From for SyncError { fn from(e: matrix_sdk::Error) -> Self { Self::Sdk(e) } } impl From> for SyncError { fn from(_: crossbeam_channel::SendError) -> Self { Self::Send } } impl From for SyncError { fn from(e: matrix_sdk::StoreError) -> Self { Self::Sdk(e.into()) } }