209 lines
6.6 KiB
Rust
209 lines
6.6 KiB
Rust
//! Synchronozation mechanism between gui thread and async runtime
|
|
|
|
use std::sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
};
|
|
|
|
use crossbeam_channel::Sender;
|
|
use matrix_sdk::{
|
|
encryption::verification::{
|
|
QrVerification, SasVerification, Verification, VerificationRequest,
|
|
},
|
|
room::Joined,
|
|
ruma::{events::room::message::RoomMessageEventContent, RoomId, UserId},
|
|
Client, RoomMember,
|
|
};
|
|
use tokio::sync::{mpsc::UnboundedReceiver, Notify};
|
|
use url::Url;
|
|
|
|
static QUIT: Notify = Notify::const_new();
|
|
|
|
/// Request to perform an action or retrieve data.
|
|
pub enum Request {
|
|
/// Perform login
|
|
Login(Url, String, String),
|
|
/// Restore session
|
|
Restore(matrix_sdk::Session),
|
|
/// Get the calculated name for a room.
|
|
RoomName(RoomId),
|
|
/// Send a message to a room.
|
|
Message(Joined, String),
|
|
/// Get a member from a joined room
|
|
JoinedMember(Joined, UserId),
|
|
/// Get the verification request with the flow id.
|
|
VerifyRequest(UserId, String),
|
|
/// Cancel a verification attempt.
|
|
VerifyCancel(VerificationRequest),
|
|
/// Accept a verification request.
|
|
VerifyAccept(VerificationRequest),
|
|
/// Find an active verification with the flow id.
|
|
VerifyStart(UserId, String),
|
|
/// Start SAS verification flow.
|
|
VerifyStartSas(VerificationRequest),
|
|
VerifySasConfirm(SasVerification),
|
|
/// Start QR code verification flow.
|
|
VerifyStartQr(VerificationRequest),
|
|
/// Stop syncing
|
|
Quit,
|
|
}
|
|
|
|
/// Response produced while running the sync loop.
|
|
pub enum Response {
|
|
/// Response from the synchronization loop
|
|
Sync(matrix_sdk::deserialized_responses::SyncResponse),
|
|
/// Calculated the name for a room
|
|
RoomName(RoomId, String),
|
|
/// Retrived a member frmo a joined room.
|
|
JoinedMember(RoomId, RoomMember),
|
|
/// Got a verification request.
|
|
VerifyRequest(VerificationRequest),
|
|
/// Started SAS verification.
|
|
VerifySas(SasVerification),
|
|
/// Started QR verification.
|
|
VerifyQr(QrVerification),
|
|
/// An error happened while responding to a request
|
|
Error(matrix_sdk::Error),
|
|
}
|
|
|
|
/// Run the synchronization loop
|
|
#[allow(unused)]
|
|
pub fn run(client: Client, mut request: UnboundedReceiver<Request>, response: Sender<Response>) {
|
|
let runtime = tokio::runtime::Runtime::new().unwrap();
|
|
runtime.block_on(async move {
|
|
let quit = Arc::new(AtomicBool::new(false));
|
|
let sync_handle = {
|
|
let client = client.clone();
|
|
let response = response.clone();
|
|
let quit = quit.clone();
|
|
tokio::spawn(async move {
|
|
client
|
|
.sync_with_callback(Default::default(), |sync| async {
|
|
response.send(Response::Sync(sync)).ok();
|
|
match quit.load(Ordering::Acquire) {
|
|
false => matrix_sdk::LoopCtrl::Continue,
|
|
true => dbg!(matrix_sdk::LoopCtrl::Break),
|
|
}
|
|
})
|
|
.await
|
|
})
|
|
};
|
|
loop {
|
|
let request = tokio::select! {
|
|
request = request.recv() => match request {
|
|
Some(request) => request,
|
|
None => break,
|
|
},
|
|
notif = QUIT.notified() => break,
|
|
};
|
|
tokio::spawn(handle_request(
|
|
request,
|
|
client.clone(),
|
|
response.clone(),
|
|
quit.clone(),
|
|
));
|
|
}
|
|
sync_handle.await;
|
|
});
|
|
}
|
|
|
|
/// Process an incoming request.
|
|
async fn handle_request(
|
|
request: Request,
|
|
client: Client,
|
|
response: Sender<Response>,
|
|
quit: Arc<AtomicBool>,
|
|
) -> Result<(), SyncError> {
|
|
match request {
|
|
Request::Login(_, _, _) => todo!(),
|
|
Request::Restore(session) => client.restore_login(session).await?,
|
|
Request::RoomName(room_id) => match client.get_room(&room_id) {
|
|
Some(room) => response.send(Response::RoomName(room_id, room.display_name().await?))?,
|
|
None => (),
|
|
},
|
|
Request::JoinedMember(room, user) => {
|
|
if let Some(member) = room.get_member(&user).await? {
|
|
response.send(Response::JoinedMember(room.room_id().clone(), member))?;
|
|
}
|
|
}
|
|
Request::Message(room, message) => {
|
|
let event = RoomMessageEventContent::text_plain(message);
|
|
room.send(event, None).await?;
|
|
}
|
|
Request::VerifyRequest(user, flow_id) => {
|
|
let verification = client.get_verification_request(&user, &flow_id).await;
|
|
if let Some(verification) = verification {
|
|
response.send(Response::VerifyRequest(verification))?;
|
|
}
|
|
}
|
|
Request::VerifyCancel(verify) => {
|
|
verify.cancel().await?;
|
|
}
|
|
Request::VerifyAccept(verify) => {
|
|
verify.accept().await?;
|
|
}
|
|
Request::VerifyStart(sender, flow_id) => {
|
|
if let Some(verify) = client.get_verification(&sender, &flow_id).await {
|
|
match verify {
|
|
// TODO: auto-accepting is very naughty
|
|
Verification::SasV1(sas) => {
|
|
sas.accept().await?;
|
|
response.send(Response::VerifySas(sas))?
|
|
}
|
|
Verification::QrV1(qr) => response.send(Response::VerifyQr(qr))?,
|
|
};
|
|
}
|
|
}
|
|
Request::VerifyStartSas(verify) => {
|
|
if let Some(sas) = verify.start_sas().await? {
|
|
response.send(Response::VerifySas(sas))?;
|
|
}
|
|
}
|
|
Request::VerifySasConfirm(sas) => {
|
|
sas.confirm().await?;
|
|
}
|
|
Request::VerifyStartQr(verify) => {
|
|
if let Some(qr) = verify.generate_qr_code().await? {
|
|
response.send(Response::VerifyQr(qr))?;
|
|
}
|
|
}
|
|
Request::Quit => {
|
|
dbg!(quit.store(true, Ordering::SeqCst));
|
|
QUIT.notify_one();
|
|
}
|
|
};
|
|
Ok(())
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum SyncError {
|
|
Sdk(matrix_sdk::Error),
|
|
Send,
|
|
}
|
|
|
|
impl std::fmt::Display for SyncError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(f, "sync error")
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for SyncError {}
|
|
|
|
impl From<matrix_sdk::Error> for SyncError {
|
|
fn from(e: matrix_sdk::Error) -> Self {
|
|
Self::Sdk(e)
|
|
}
|
|
}
|
|
|
|
impl<T> From<crossbeam_channel::SendError<T>> for SyncError {
|
|
fn from(_: crossbeam_channel::SendError<T>) -> Self {
|
|
Self::Send
|
|
}
|
|
}
|
|
|
|
impl From<matrix_sdk::StoreError> for SyncError {
|
|
fn from(e: matrix_sdk::StoreError) -> Self {
|
|
Self::Sdk(e.into())
|
|
}
|
|
}
|