Implement subscription for matrix events w channel

This commit is contained in:
Amanda Graven 2020-12-02 06:52:21 +01:00
parent 80f4ab6451
commit 061276524a
Signed by: amanda
GPG key ID: 45C461CDC9286390
4 changed files with 260 additions and 59 deletions

View file

@ -8,10 +8,15 @@ edition = "2018"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
crossbeam-channel = "0.4"
dirs-next = "2.0"
iced = { git = "https://github.com/hecrj/iced", rev = "fc4270f", features = ["debug", "tokio"] }
futures = "0.3"
iced = { version = "0.2", features = ["debug", "tokio_old"] }
iced_futures = "0.2"
hostname = "0.3"
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk", rev = "27c6f30" }
matrix-sdk = { git = "https://github.com/matrix-org/matrix-rust-sdk", rev = "e65915e" }
matrix-sdk-common-macros = { git = "https://github.com/matrix-org/matrix-rust-sdk", rev = "e65915e" }
serde = { version = "1.0", features = ["derive"] }
tokio = "0.2"
tokio = { version = "0.2", features = ["sync"] }
toml = "0.5"

View file

@ -1,3 +1,4 @@
extern crate crossbeam_channel as channel;
extern crate dirs_next as dirs;
use std::fs::Permissions;
@ -16,7 +17,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
std::fs::set_permissions(&config_dir, Permissions::from_mode(0o700))?;
}
ui::Retrix::run(iced::Settings::default());
ui::Retrix::run(iced::Settings::default())?;
Ok(())
}

View file

@ -1,5 +1,6 @@
use matrix_sdk::{
identifiers::DeviceId, identifiers::UserId, reqwest::Url, Client, ClientConfig, SyncSettings,
events::AnyRoomEvent, events::AnySyncRoomEvent, identifiers::DeviceId, identifiers::UserId,
reqwest::Url, Client, ClientConfig, LoopCtrl, SyncSettings,
};
use serde::{Deserialize, Serialize};
@ -94,3 +95,82 @@ fn write_session(session: &Session) -> Result<(), Error> {
Ok(())
}
pub struct MatrixSync {
client: matrix_sdk::Client,
//id: String,
}
impl MatrixSync {
pub fn subscription(client: matrix_sdk::Client) -> iced::Subscription<AnyRoomEvent> {
iced::Subscription::from_recipe(MatrixSync { client })
}
}
/*#[async_trait]
impl EventEmitter for Callback {
async fn on_room_message(&self, room: SyncRoom, event: &SyncMessageEvent<MessageEventContent>) {
let room_id = if let matrix_sdk::RoomState::Joined(arc) = room {
let room = arc.read().await;
room.room_id.clone()
} else {
return;
};
self.sender
.send(event.clone().into_full_event(room_id))
.ok();
}
}*/
impl<H, I> iced_futures::subscription::Recipe<H, I> for MatrixSync
where
H: std::hash::Hasher,
{
type Output = AnyRoomEvent;
fn hash(&self, state: &mut H) {
use std::hash::Hash;
std::any::TypeId::of::<Self>().hash(state);
//self.id.hash(state);
}
fn stream(
self: Box<Self>,
_input: iced_futures::BoxStream<I>,
) -> iced_futures::BoxStream<Self::Output> {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let client = self.client.clone();
tokio::task::spawn(async move {
client
.sync_with_callback(SyncSettings::new(), |response| async {
for (room_id, room) in response.rooms.join {
for event in room.timeline.events {
if let Ok(event) = event.deserialize() {
let room_id = room_id.clone();
let event = match event {
AnySyncRoomEvent::Message(e) => {
AnyRoomEvent::Message(e.into_full_event(room_id))
}
AnySyncRoomEvent::State(e) => {
AnyRoomEvent::State(e.into_full_event(room_id))
}
AnySyncRoomEvent::RedactedMessage(e) => {
AnyRoomEvent::RedactedMessage(e.into_full_event(room_id))
}
AnySyncRoomEvent::RedactedState(e) => {
AnyRoomEvent::RedactedState(e.into_full_event(room_id))
}
};
sender.send(event).ok();
}
}
}
LoopCtrl::Continue
})
.await;
});
Box::pin(receiver)
}
}

223
src/ui.rs
View file

@ -1,6 +1,16 @@
use std::collections::{BTreeMap, HashMap};
use iced::{
text_input::{self, TextInput},
Application, Button, Column, Command, Container, Element, Length, Scrollable, Text,
Application, Button, Column, Command, Container, Element, Length, Row, Rule, Scrollable,
Subscription, Text,
};
use matrix_sdk::{
events::{
room::message::MessageEventContent, AnyPossiblyRedactedSyncMessageEvent, AnyRoomEvent,
AnySyncMessageEvent,
},
identifiers::RoomId,
};
use crate::matrix;
@ -18,12 +28,15 @@ pub enum Retrix {
server: String,
error: Option<String>,
},
AwaitLogin,
AwaitLogin(std::time::Instant),
LoggedIn {
client: matrix_sdk::Client,
session: matrix::Session,
rooms: Vec<String>,
rooms: BTreeMap<RoomId, String>,
buttons: HashMap<RoomId, iced::button::State>,
messages: BTreeMap<RoomId, MessageEventContent>,
selected: Option<RoomId>,
room_scroll: iced::scrollable::State,
},
}
@ -55,7 +68,9 @@ pub enum Message {
LoginFailed(String),
// Main state messages
ResetRooms(Vec<String>),
ResetRooms(BTreeMap<RoomId, String>),
SelectRoom(RoomId),
Sync(AnyRoomEvent),
}
impl Application for Retrix {
@ -74,7 +89,7 @@ impl Application for Retrix {
Err(e) => Message::LoginFailed(e.to_string()),
},
);
(Retrix::AwaitLogin, command)
(Retrix::AwaitLogin(std::time::Instant::now()), command)
}
None => (Retrix::new_prompt(), Command::none()),
}
@ -84,12 +99,21 @@ impl Application for Retrix {
String::from("Retrix matrix client")
}
fn subscription(&self) -> Subscription<Self::Message> {
match self {
Retrix::LoggedIn { client, .. } => {
matrix::MatrixSync::subscription(client.clone()).map(Message::Sync)
}
_ => Subscription::none(),
}
}
fn update(&mut self, message: Self::Message) -> Command<Self::Message> {
match *self {
match self {
Retrix::Prompt {
ref mut user,
ref mut password,
ref mut server,
user,
password,
server,
..
} => match message {
Message::SetUser(u) => *user = u,
@ -99,7 +123,7 @@ impl Application for Retrix {
let user = user.clone();
let password = password.clone();
let server = server.clone();
*self = Retrix::AwaitLogin;
*self = Retrix::AwaitLogin(std::time::Instant::now());
return Command::perform(
async move { matrix::login(&user, &password, &server).await },
|result| match result {
@ -110,7 +134,7 @@ impl Application for Retrix {
}
_ => (),
},
Retrix::AwaitLogin => match message {
Retrix::AwaitLogin(_) => match message {
Message::LoginFailed(e) => {
*self = Retrix::new_prompt();
if let Retrix::Prompt { ref mut error, .. } = *self {
@ -121,26 +145,32 @@ impl Application for Retrix {
*self = Retrix::LoggedIn {
client: client.clone(),
session,
rooms: Vec::new(),
rooms: BTreeMap::new(),
selected: None,
room_scroll: Default::default(),
buttons: Default::default(),
messages: Default::default(),
};
let client = client.clone();
return Command::perform(
async move {
let mut list = Vec::new();
for (_, room) in client.joined_rooms().read().await.iter() {
let mut rooms = BTreeMap::new();
for (id, room) in client.joined_rooms().read().await.iter() {
let name = room.read().await.display_name();
list.push(name);
rooms.insert(id.to_owned(), name);
}
list
rooms
},
|rooms| Message::ResetRooms(rooms),
);
}
_ => (),
},
Retrix::LoggedIn { ref mut rooms, .. } => match message {
Retrix::LoggedIn {
rooms, selected, ..
} => match message {
Message::ResetRooms(r) => *rooms = r,
Message::SelectRoom(r) => *selected = Some(r),
_ => (),
},
};
@ -148,39 +178,32 @@ impl Application for Retrix {
}
fn view(&mut self) -> Element<Self::Message> {
match *self {
match self {
Retrix::Prompt {
ref mut user_input,
ref mut password_input,
ref mut server_input,
ref mut login_button,
ref user,
ref password,
ref server,
ref error,
user_input,
password_input,
server_input,
login_button,
user,
password,
server,
error,
} => {
// Login form
let mut content = Column::new()
.width(500.into())
.push(Text::new("Username"))
.push(
TextInput::new(user_input, "Username", user, |val| Message::SetUser(val))
.padding(5),
)
.push(TextInput::new(user_input, "Username", user, Message::SetUser).padding(5))
.push(Text::new("Password"))
.push(
TextInput::new(password_input, "Password", password, |val| {
Message::SetPassword(val)
})
.password()
.padding(5),
TextInput::new(password_input, "Password", password, Message::SetPassword)
.password()
.padding(5),
)
.push(Text::new("Homeserver"))
.push(
TextInput::new(server_input, "Server", server, |val| {
Message::SetServer(val)
})
.padding(5),
TextInput::new(server_input, "Server", server, Message::SetServer)
.padding(5),
)
.push(Button::new(login_button, Text::new("Login")).on_press(Message::Login));
if let Some(ref error) = error {
@ -194,29 +217,121 @@ impl Application for Retrix {
.height(iced::Length::Fill)
.into()
}
Retrix::AwaitLogin => Container::new(Text::new("Logging in..."))
.center_x()
.center_y()
.width(Length::Fill)
.height(Length::Fill)
.into(),
Retrix::AwaitLogin(instant) => Container::new(Text::new(format!(
"Logging in{}",
match instant.elapsed().subsec_millis() / 333 {
0 => ".",
1 => "..",
2 => "...",
_ => "....",
}
)))
.center_x()
.center_y()
.width(Length::Fill)
.height(Length::Fill)
.into(),
Retrix::LoggedIn {
ref rooms,
ref mut room_scroll,
client,
room_scroll,
buttons,
selected,
..
} => {
//let mut root_row = Row::new().width(Length::Fill).height(Length::Fill);
let mut root_row = Row::new().width(Length::Fill).height(Length::Fill);
// Room list
let joined = client.joined_rooms();
let rooms = futures::executor::block_on(async { joined.read().await });
let mut room_col = Scrollable::new(room_scroll)
.width(400.into())
.height(Length::Fill)
.spacing(15);
for room in rooms {
room_col = room_col.push(Text::new(room));
.scrollbar_width(5);
// We have to iterate the buttons map and not the other way around to make the
// borrow checker happy. First we make sure there's a button entry for every room
// entry, and clean up button entries from removed rooms.
for (id, _) in rooms.iter() {
buttons.entry(id.to_owned()).or_default();
}
room_col.into()
//root_row = root_row.push(room_col);
//root_row.into()
buttons.retain(|id, _| rooms.contains_key(id));
// Then we make our buttons
let buttons: Vec<Button<_>> = buttons
.iter_mut()
.map(|(id, state)| {
// Get read lock for the room
let room = futures::executor::block_on(async {
rooms.get(id).unwrap().read().await
});
Button::new(state, Text::new(room.display_name()))
.on_press(Message::SelectRoom(id.to_owned()))
.width(400.into())
})
.collect();
// Then we add them to our room column. What a mess.
for button in buttons {
room_col = room_col.push(button);
}
root_row = root_row.push(room_col);
// Messages.
//
// Get selected room.
let selected_room = selected.as_ref().and_then(|selected| {
futures::executor::block_on(async {
match rooms.get(selected) {
Some(room) => Some(room.read().await),
None => None,
}
})
});
if let Some(room) = selected_room {
let mut col = Column::new()
.spacing(5)
.padding(5)
.push(Text::new(room.display_name()).size(25))
.push(Rule::horizontal(2));
for message in room.messages.iter() {
if let AnyPossiblyRedactedSyncMessageEvent::Regular(event) = message {
match event {
AnySyncMessageEvent::RoomMessage(room_message) => {
match &room_message.content {
MessageEventContent::Text(text) => {
let row = Row::new()
.spacing(5)
.push(
Text::new(room_message.sender.localpart())
.color([0.2, 0.2, 1.0]),
)
.push(Text::new(&text.body).width(Length::Fill))
.push(Text::new(format_systime(
room_message.origin_server_ts,
)));
col = col.push(row);
}
_ => (),
}
}
_ => (),
}
}
}
root_row = root_row.push(col);
}
root_row.into()
}
}
}
}
fn format_systime(time: std::time::SystemTime) -> String {
let secs = time
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
format!(
"{:02}:{:02}",
(secs % (60 * 60 * 24)) / (60 * 60),
(secs % (60 * 60)) / 60
)
}