From 061276524a0d9a14f5d9ed9a1bad1f1a0e713f88 Mon Sep 17 00:00:00 2001 From: Amanda Graven Date: Wed, 2 Dec 2020 06:52:21 +0100 Subject: [PATCH] Implement subscription for matrix events w channel --- Cargo.toml | 11 ++- src/main.rs | 3 +- src/matrix.rs | 82 ++++++++++++++++++- src/ui.rs | 223 ++++++++++++++++++++++++++++++++++++++------------ 4 files changed, 260 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8c4a013..4c68f6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,15 @@ edition = "2018" [dependencies] anyhow = "1.0" +async-trait = "0.1" +crossbeam-channel = "0.4" dirs-next = "2.0" -iced = { git = "https://github.com/hecrj/iced", rev = "fc4270f", features = ["debug", "tokio"] } +futures = "0.3" +iced = { version = "0.2", features = ["debug", "tokio_old"] } +iced_futures = "0.2" hostname = "0.3" -matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk", rev = "27c6f30" } +matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk", rev = "e65915e" } +matrix-sdk-common-macros = { git = "https://github.com/matrix-org/matrix-rust-sdk", rev = "e65915e" } serde = { version = "1.0", features = ["derive"] } -tokio = "0.2" +tokio = { version = "0.2", features = ["sync"] } toml = "0.5" diff --git a/src/main.rs b/src/main.rs index 2ac5c63..566c985 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +extern crate crossbeam_channel as channel; extern crate dirs_next as dirs; use std::fs::Permissions; @@ -16,7 +17,7 @@ fn main() -> Result<(), Box> { std::fs::set_permissions(&config_dir, Permissions::from_mode(0o700))?; } - ui::Retrix::run(iced::Settings::default()); + ui::Retrix::run(iced::Settings::default())?; Ok(()) } diff --git a/src/matrix.rs b/src/matrix.rs index 408a7f6..4c7a4df 100644 --- a/src/matrix.rs +++ b/src/matrix.rs @@ -1,5 +1,6 @@ use matrix_sdk::{ - identifiers::DeviceId, identifiers::UserId, reqwest::Url, Client, ClientConfig, SyncSettings, + events::AnyRoomEvent, events::AnySyncRoomEvent, identifiers::DeviceId, identifiers::UserId, + reqwest::Url, Client, ClientConfig, LoopCtrl, SyncSettings, }; use serde::{Deserialize, Serialize}; @@ -94,3 +95,82 @@ fn write_session(session: &Session) -> Result<(), Error> { Ok(()) } + +pub struct MatrixSync { + client: matrix_sdk::Client, + //id: String, +} + +impl MatrixSync { + pub fn subscription(client: matrix_sdk::Client) -> iced::Subscription { + iced::Subscription::from_recipe(MatrixSync { client }) + } +} + +/*#[async_trait] +impl EventEmitter for Callback { + async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent) { + let room_id = if let matrix_sdk::RoomState::Joined(arc) = room { + let room = arc.read().await; + room.room_id.clone() + } else { + return; + }; + self.sender + .send(event.clone().into_full_event(room_id)) + .ok(); + } +}*/ + +impl iced_futures::subscription::Recipe for MatrixSync +where + H: std::hash::Hasher, +{ + type Output = AnyRoomEvent; + + fn hash(&self, state: &mut H) { + use std::hash::Hash; + + std::any::TypeId::of::().hash(state); + //self.id.hash(state); + } + + fn stream( + self: Box, + _input: iced_futures::BoxStream, + ) -> iced_futures::BoxStream { + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); + let client = self.client.clone(); + tokio::task::spawn(async move { + client + .sync_with_callback(SyncSettings::new(), |response| async { + for (room_id, room) in response.rooms.join { + for event in room.timeline.events { + if let Ok(event) = event.deserialize() { + let room_id = room_id.clone(); + let event = match event { + AnySyncRoomEvent::Message(e) => { + AnyRoomEvent::Message(e.into_full_event(room_id)) + } + AnySyncRoomEvent::State(e) => { + AnyRoomEvent::State(e.into_full_event(room_id)) + } + AnySyncRoomEvent::RedactedMessage(e) => { + AnyRoomEvent::RedactedMessage(e.into_full_event(room_id)) + } + AnySyncRoomEvent::RedactedState(e) => { + AnyRoomEvent::RedactedState(e.into_full_event(room_id)) + } + }; + sender.send(event).ok(); + } + } + } + + LoopCtrl::Continue + }) + .await; + }); + Box::pin(receiver) + } +} diff --git a/src/ui.rs b/src/ui.rs index d0bcb77..56cde8c 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,6 +1,16 @@ +use std::collections::{BTreeMap, HashMap}; + use iced::{ text_input::{self, TextInput}, - Application, Button, Column, Command, Container, Element, Length, Scrollable, Text, + Application, Button, Column, Command, Container, Element, Length, Row, Rule, Scrollable, + Subscription, Text, +}; +use matrix_sdk::{ + events::{ + room::message::MessageEventContent, AnyPossiblyRedactedSyncMessageEvent, AnyRoomEvent, + AnySyncMessageEvent, + }, + identifiers::RoomId, }; use crate::matrix; @@ -18,12 +28,15 @@ pub enum Retrix { server: String, error: Option, }, - AwaitLogin, + AwaitLogin(std::time::Instant), LoggedIn { client: matrix_sdk::Client, session: matrix::Session, - rooms: Vec, + rooms: BTreeMap, + buttons: HashMap, + messages: BTreeMap, + selected: Option, room_scroll: iced::scrollable::State, }, } @@ -55,7 +68,9 @@ pub enum Message { LoginFailed(String), // Main state messages - ResetRooms(Vec), + ResetRooms(BTreeMap), + SelectRoom(RoomId), + Sync(AnyRoomEvent), } impl Application for Retrix { @@ -74,7 +89,7 @@ impl Application for Retrix { Err(e) => Message::LoginFailed(e.to_string()), }, ); - (Retrix::AwaitLogin, command) + (Retrix::AwaitLogin(std::time::Instant::now()), command) } None => (Retrix::new_prompt(), Command::none()), } @@ -84,12 +99,21 @@ impl Application for Retrix { String::from("Retrix matrix client") } + fn subscription(&self) -> Subscription { + match self { + Retrix::LoggedIn { client, .. } => { + matrix::MatrixSync::subscription(client.clone()).map(Message::Sync) + } + _ => Subscription::none(), + } + } + fn update(&mut self, message: Self::Message) -> Command { - match *self { + match self { Retrix::Prompt { - ref mut user, - ref mut password, - ref mut server, + user, + password, + server, .. } => match message { Message::SetUser(u) => *user = u, @@ -99,7 +123,7 @@ impl Application for Retrix { let user = user.clone(); let password = password.clone(); let server = server.clone(); - *self = Retrix::AwaitLogin; + *self = Retrix::AwaitLogin(std::time::Instant::now()); return Command::perform( async move { matrix::login(&user, &password, &server).await }, |result| match result { @@ -110,7 +134,7 @@ impl Application for Retrix { } _ => (), }, - Retrix::AwaitLogin => match message { + Retrix::AwaitLogin(_) => match message { Message::LoginFailed(e) => { *self = Retrix::new_prompt(); if let Retrix::Prompt { ref mut error, .. } = *self { @@ -121,26 +145,32 @@ impl Application for Retrix { *self = Retrix::LoggedIn { client: client.clone(), session, - rooms: Vec::new(), + rooms: BTreeMap::new(), + selected: None, room_scroll: Default::default(), + buttons: Default::default(), + messages: Default::default(), }; let client = client.clone(); return Command::perform( async move { - let mut list = Vec::new(); - for (_, room) in client.joined_rooms().read().await.iter() { + let mut rooms = BTreeMap::new(); + for (id, room) in client.joined_rooms().read().await.iter() { let name = room.read().await.display_name(); - list.push(name); + rooms.insert(id.to_owned(), name); } - list + rooms }, |rooms| Message::ResetRooms(rooms), ); } _ => (), }, - Retrix::LoggedIn { ref mut rooms, .. } => match message { + Retrix::LoggedIn { + rooms, selected, .. + } => match message { Message::ResetRooms(r) => *rooms = r, + Message::SelectRoom(r) => *selected = Some(r), _ => (), }, }; @@ -148,39 +178,32 @@ impl Application for Retrix { } fn view(&mut self) -> Element { - match *self { + match self { Retrix::Prompt { - ref mut user_input, - ref mut password_input, - ref mut server_input, - ref mut login_button, - ref user, - ref password, - ref server, - ref error, + user_input, + password_input, + server_input, + login_button, + user, + password, + server, + error, } => { // Login form let mut content = Column::new() .width(500.into()) .push(Text::new("Username")) - .push( - TextInput::new(user_input, "Username", user, |val| Message::SetUser(val)) - .padding(5), - ) + .push(TextInput::new(user_input, "Username", user, Message::SetUser).padding(5)) .push(Text::new("Password")) .push( - TextInput::new(password_input, "Password", password, |val| { - Message::SetPassword(val) - }) - .password() - .padding(5), + TextInput::new(password_input, "Password", password, Message::SetPassword) + .password() + .padding(5), ) .push(Text::new("Homeserver")) .push( - TextInput::new(server_input, "Server", server, |val| { - Message::SetServer(val) - }) - .padding(5), + TextInput::new(server_input, "Server", server, Message::SetServer) + .padding(5), ) .push(Button::new(login_button, Text::new("Login")).on_press(Message::Login)); if let Some(ref error) = error { @@ -194,29 +217,121 @@ impl Application for Retrix { .height(iced::Length::Fill) .into() } - Retrix::AwaitLogin => Container::new(Text::new("Logging in...")) - .center_x() - .center_y() - .width(Length::Fill) - .height(Length::Fill) - .into(), + Retrix::AwaitLogin(instant) => Container::new(Text::new(format!( + "Logging in{}", + match instant.elapsed().subsec_millis() / 333 { + 0 => ".", + 1 => "..", + 2 => "...", + _ => "....", + } + ))) + .center_x() + .center_y() + .width(Length::Fill) + .height(Length::Fill) + .into(), Retrix::LoggedIn { - ref rooms, - ref mut room_scroll, + client, + room_scroll, + buttons, + selected, .. } => { - //let mut root_row = Row::new().width(Length::Fill).height(Length::Fill); + let mut root_row = Row::new().width(Length::Fill).height(Length::Fill); + + // Room list + let joined = client.joined_rooms(); + let rooms = futures::executor::block_on(async { joined.read().await }); let mut room_col = Scrollable::new(room_scroll) .width(400.into()) .height(Length::Fill) - .spacing(15); - for room in rooms { - room_col = room_col.push(Text::new(room)); + .scrollbar_width(5); + // We have to iterate the buttons map and not the other way around to make the + // borrow checker happy. First we make sure there's a button entry for every room + // entry, and clean up button entries from removed rooms. + for (id, _) in rooms.iter() { + buttons.entry(id.to_owned()).or_default(); } - room_col.into() - //root_row = root_row.push(room_col); - //root_row.into() + buttons.retain(|id, _| rooms.contains_key(id)); + // Then we make our buttons + let buttons: Vec> = buttons + .iter_mut() + .map(|(id, state)| { + // Get read lock for the room + let room = futures::executor::block_on(async { + rooms.get(id).unwrap().read().await + }); + Button::new(state, Text::new(room.display_name())) + .on_press(Message::SelectRoom(id.to_owned())) + .width(400.into()) + }) + .collect(); + // Then we add them to our room column. What a mess. + for button in buttons { + room_col = room_col.push(button); + } + root_row = root_row.push(room_col); + + // Messages. + // + // Get selected room. + let selected_room = selected.as_ref().and_then(|selected| { + futures::executor::block_on(async { + match rooms.get(selected) { + Some(room) => Some(room.read().await), + None => None, + } + }) + }); + if let Some(room) = selected_room { + let mut col = Column::new() + .spacing(5) + .padding(5) + .push(Text::new(room.display_name()).size(25)) + .push(Rule::horizontal(2)); + for message in room.messages.iter() { + if let AnyPossiblyRedactedSyncMessageEvent::Regular(event) = message { + match event { + AnySyncMessageEvent::RoomMessage(room_message) => { + match &room_message.content { + MessageEventContent::Text(text) => { + let row = Row::new() + .spacing(5) + .push( + Text::new(room_message.sender.localpart()) + .color([0.2, 0.2, 1.0]), + ) + .push(Text::new(&text.body).width(Length::Fill)) + .push(Text::new(format_systime( + room_message.origin_server_ts, + ))); + col = col.push(row); + } + _ => (), + } + } + _ => (), + } + } + } + root_row = root_row.push(col); + } + + root_row.into() } } } } + +fn format_systime(time: std::time::SystemTime) -> String { + let secs = time + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + format!( + "{:02}:{:02}", + (secs % (60 * 60 * 24)) / (60 * 60), + (secs % (60 * 60)) / 60 + ) +}