Incorporate state store rewrite

Also add prototype message history backfill. Backfilled encrypted
messages can't be handled before some api changes.
This commit is contained in:
Amanda Graven 2021-01-03 21:08:19 +01:00
parent 45c4b62150
commit 2a9dbb90d3
Signed by: amanda
GPG key ID: 45C461CDC9286390
3 changed files with 123 additions and 44 deletions

View file

@ -23,7 +23,7 @@ tracing-subscriber = { version = "0.2", features = ["parking_lot"] }
[dependencies.matrix-sdk] [dependencies.matrix-sdk]
git = "https://github.com/matrix-org/matrix-rust-sdk" git = "https://github.com/matrix-org/matrix-rust-sdk"
rev = "d9e5a17" rev = "74998c8"
default_features = false default_features = false
features = ["encryption", "sqlite_cryptostore", "messages", "rustls-tls", "unstable-synapse-quirks"] features = ["encryption", "sqlite_cryptostore", "messages", "rustls-tls", "unstable-synapse-quirks"]

View file

@ -214,10 +214,6 @@ where
sender.send(Event::Token(response.next_batch)).ok(); sender.send(Event::Token(response.next_batch)).ok();
for (id, room) in response.rooms.join { for (id, room) in response.rooms.join {
for event in room.timeline.events { for event in room.timeline.events {
let event = match event.deserialize() {
Ok(event) => event,
Err(_) => continue,
};
let id = id.clone(); let id = id.clone();
let event = match event { let event = match event {
AnySyncRoomEvent::Message(e) => { AnySyncRoomEvent::Message(e) => {
@ -237,10 +233,6 @@ where
} }
} }
for event in response.to_device.events { for event in response.to_device.events {
let event = match event.deserialize() {
Ok(event) => event,
Err(_) => continue,
};
sender.send(Event::ToDevice(event)).ok(); sender.send(Event::ToDevice(event)).ok();
} }
LoopCtrl::Continue LoopCtrl::Continue

157
src/ui.rs
View file

@ -9,10 +9,13 @@ use iced::{
Subscription, Text, TextInput, Subscription, Text, TextInput,
}; };
use matrix_sdk::{ use matrix_sdk::{
api::r0::message::get_message_events::{
Request as MessageRequest, Response as MessageResponse,
},
events::{ events::{
key::verification::cancel::CancelCode as VerificationCancelCode, key::verification::cancel::CancelCode as VerificationCancelCode,
room::message::MessageEventContent, AnyMessageEvent, AnyMessageEventContent, room::message::MessageEventContent, AnyMessageEvent, AnyMessageEventContent, AnyRoomEvent,
AnyPossiblyRedactedSyncMessageEvent, AnyRoomEvent, AnyStateEvent, AnyToDeviceEvent, AnyStateEvent, AnyToDeviceEvent,
}, },
identifiers::{EventId, RoomAliasId, RoomId, UserId}, identifiers::{EventId, RoomAliasId, RoomId, UserId},
}; };
@ -85,7 +88,7 @@ impl MessageBuffer {
/// Sorts the messages by send time /// Sorts the messages by send time
fn sort(&mut self) { fn sort(&mut self) {
self.messages self.messages
.sort_unstable_by(|a, b| a.origin_server_ts().cmp(&b.origin_server_ts())) .sort_unstable_by_key(|msg| msg.origin_server_ts())
} }
/// Gets the send time of the most recently sent message /// Gets the send time of the most recently sent message
@ -209,6 +212,7 @@ impl MainView {
.scrollbar_width(5); .scrollbar_width(5);
// Group by DM and group conversation // Group by DM and group conversation
#[allow(clippy::type_complexity)]
let (mut dm_rooms, mut group_rooms): ( let (mut dm_rooms, mut group_rooms): (
Vec<(&RoomId, &RoomEntry)>, Vec<(&RoomId, &RoomEntry)>,
Vec<(&RoomId, &RoomEntry)>, Vec<(&RoomId, &RoomEntry)>,
@ -219,14 +223,16 @@ impl MainView {
// Sort // Sort
for list in [&mut dm_rooms, &mut group_rooms].iter_mut() { for list in [&mut dm_rooms, &mut group_rooms].iter_mut() {
match self.sorting { match self.sorting {
RoomSorting::Alphabetic => list.sort_unstable_by(|(_, a), (_, b)| { RoomSorting::Alphabetic => {
a.name.to_uppercase().cmp(&b.name.to_uppercase()) list.sort_unstable_by_key(|(_, room)| room.name.to_uppercase())
}), }
// TODO: fix this
RoomSorting::Recent => list.sort_unstable_by(|(_, a), (_, b)| { RoomSorting::Recent => list.sort_unstable_by(|(_, a), (_, b)| {
a.messages.updated.cmp(&b.messages.updated).reverse() a.messages.updated.cmp(&b.messages.updated).reverse()
}), }),
}; };
} }
// Make sure button handler list has appropriate length
self.dm_buttons self.dm_buttons
.resize_with(dm_rooms.len(), Default::default); .resize_with(dm_rooms.len(), Default::default);
self.group_buttons self.group_buttons
@ -241,7 +247,7 @@ impl MainView {
let (id, room) = dm_rooms[idx]; let (id, room) = dm_rooms[idx];
Button::new(button, Text::new(&room.name)) Button::new(button, Text::new(&room.name))
.width(300.into()) .width(300.into())
.on_press(Message::SelectRoom(id.clone().clone())) .on_press(Message::SelectRoom(id.to_owned()))
}) })
.collect(); .collect();
let room_buttons: Vec<Button<_>> = self let room_buttons: Vec<Button<_>> = self
@ -252,7 +258,7 @@ impl MainView {
let (id, room) = group_rooms[idx]; let (id, room) = group_rooms[idx];
Button::new(button, Text::new(&room.name)) Button::new(button, Text::new(&room.name))
.width(300.into()) .width(300.into())
.on_press(Message::SelectRoom(id.clone())) .on_press(Message::SelectRoom(id.to_owned()))
}) })
.collect(); .collect();
// Add buttons to container // Add buttons to container
@ -279,23 +285,30 @@ impl MainView {
None => None, None => None,
}; };
if let Some(room) = selected_room { if let Some(room) = selected_room {
let title = if let Some(ref direct) = room.direct {
format!("{} ({})", &room.name, direct)
} else if let Some(ref alias) = room.alias {
format!("{} ({})", &room.name, alias)
} else {
room.name.clone()
};
message_col = message_col message_col = message_col
.push(Text::new(&room.name).size(25)) .push(Text::new(title).size(25))
.push(Rule::horizontal(2)); .push(Rule::horizontal(2));
let mut scroll = Scrollable::new(&mut self.message_scroll) let mut scroll = Scrollable::new(&mut self.message_scroll)
.scrollbar_width(2) .scrollbar_width(2)
.height(Length::Fill); .height(Length::Fill);
for event in room.messages.messages.iter() { for event in room.messages.messages.iter() {
#[allow(clippy::single_match)]
match event { match event {
AnyRoomEvent::Message(AnyMessageEvent::RoomMessage(message)) => { AnyRoomEvent::Message(AnyMessageEvent::RoomMessage(message)) => {
let sender = { let sender = {
let joined = self.client.joined_rooms(); match self.client.get_joined_room(&message.room_id) {
let rooms_lock = block_on(async { joined.read().await });
match rooms_lock.get(&message.room_id) {
Some(backend) => { Some(backend) => {
let room_lock = block_on(async { backend.read().await }); match block_on(async {
match room_lock.joined_members.get(&message.sender) { backend.get_member(&message.sender).await
Some(member) => member.disambiguated_name(), }) {
Some(member) => member.name().to_owned(),
None => message.sender.to_string(), None => message.sender.to_string(),
} }
} }
@ -348,6 +361,9 @@ impl MainView {
.push(Text::new(format_systime(message.origin_server_ts))); .push(Text::new(format_systime(message.origin_server_ts)));
scroll = scroll.push(row); scroll = scroll.push(row);
} }
AnyRoomEvent::Message(AnyMessageEvent::RoomEncrypted(_encrypted)) => {
scroll = scroll.push(Text::new("Encrypted event").color([0.3, 0.3, 0.3]))
}
_ => (), _ => (),
} }
} }
@ -445,6 +461,7 @@ impl MainView {
} }
} }
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Retrix { pub enum Retrix {
Prompt(PromptView), Prompt(PromptView),
@ -471,6 +488,8 @@ pub enum Message {
ResetRoom(RoomId, RoomEntry), ResetRoom(RoomId, RoomEntry),
/// Get backfill for given room /// Get backfill for given room
BackFill(RoomId), BackFill(RoomId),
/// Received backfille
BackFilled(RoomId, MessageResponse),
/// View messages from this room /// View messages from this room
SelectRoom(RoomId), SelectRoom(RoomId),
/// Set error message /// Set error message
@ -608,27 +627,28 @@ impl Application for Retrix {
}, },
Retrix::AwaitLogin => match message { Retrix::AwaitLogin => match message {
Message::LoginFailed(e) => { Message::LoginFailed(e) => {
let mut view = PromptView::default(); let view = PromptView {
view.error = Some(e); error: Some(e),
..PromptView::default()
};
*self = Retrix::Prompt(view); *self = Retrix::Prompt(view);
} }
Message::LoggedIn(client, session) => { Message::LoggedIn(client, session) => {
*self = Retrix::LoggedIn(MainView::new(client.clone(), session)); *self = Retrix::LoggedIn(MainView::new(client.clone(), session));
let joined = client.joined_rooms();
let read = block_on(async { joined.read().await });
let mut commands: Vec<Command<Message>> = Vec::new(); let mut commands: Vec<Command<Message>> = Vec::new();
for (id, room) in read.iter() { for room in client.joined_rooms().into_iter() {
let id = id.clone();
let room = room.clone();
let client = client.clone();
let command = async move { let command = async move {
let room = room.read().await; let room = room.clone();
let mut entry = RoomEntry::default(); let entry = RoomEntry {
direct: room.direct_target(),
name: block_on(async { room.calculate_name().await }),
topic: room.topic().unwrap_or_default(),
..RoomEntry::default()
};
entry.direct = room.direct_target.clone();
// Display name calculation for DMs is bronk so we're doing it // Display name calculation for DMs is bronk so we're doing it
// ourselves // ourselves
match entry.direct { /*match entry.direct {
Some(ref direct) => { Some(ref direct) => {
let request = matrix_sdk::api::r0::profile::get_display_name::Request::new(direct); let request = matrix_sdk::api::r0::profile::get_display_name::Request::new(direct);
if let Ok(response) = client.send(request).await { if let Ok(response) = client.send(request).await {
@ -638,8 +658,8 @@ impl Application for Retrix {
} }
} }
None => entry.name = room.display_name(), None => entry.name = room.display_name(),
} }*/
let messages = room /*let messages = room
.messages .messages
.iter() .iter()
.cloned() .cloned()
@ -654,9 +674,10 @@ impl Application for Retrix {
} }
}) })
.collect(); .collect();
entry.messages.messages = messages; entry.messages.messages = messages;*/
Message::ResetRoom(id, entry) Message::ResetRoom(room.room_id().to_owned(), entry)
}.into(); }
.into();
commands.push(command) commands.push(command)
} }
return Command::batch(commands); return Command::batch(commands);
@ -670,12 +691,36 @@ impl Application for Retrix {
Message::ResetRoom(id, room) => { Message::ResetRoom(id, room) => {
view.rooms.insert(id, room).and(Some(())).unwrap_or(()) view.rooms.insert(id, room).and(Some(())).unwrap_or(())
} }
Message::SelectRoom(r) => view.selected = Some(r), Message::SelectRoom(r) => {
view.selected = Some(r.clone());
return async move { Message::BackFill(r) }.into();
}
Message::Sync(event) => match event { Message::Sync(event) => match event {
matrix::Event::Room(event) => match event { matrix::Event::Room(event) => match event {
AnyRoomEvent::Message(event) => { AnyRoomEvent::Message(event) => {
let room = view.rooms.entry(event.room_id().clone()).or_default(); let room = view.rooms.entry(event.room_id().clone()).or_default();
room.messages.push(AnyRoomEvent::Message(event)); room.messages.push(AnyRoomEvent::Message(event.clone()));
// Set read marker if message is in selected room
if view.selected.as_ref() == Some(event.room_id()) {
let client = view.client.clone();
return Command::perform(
async move {
client
.read_marker(
event.room_id(),
event.event_id(),
Some(event.event_id()),
)
.await
.err()
},
|result| match result {
Some(err) => Message::ErrorMessage(err.to_string()),
// TODO: Make this an actual no-op
None => Message::Login,
},
);
}
} }
AnyRoomEvent::State(event) => match event { AnyRoomEvent::State(event) => match event {
AnyStateEvent::RoomCanonicalAlias(ref alias) => { AnyStateEvent::RoomCanonicalAlias(ref alias) => {
@ -706,7 +751,7 @@ impl Application for Retrix {
let client = view.client.clone(); let client = view.client.clone();
return Command::perform( return Command::perform(
async move { async move {
tokio::time::delay_for(std::time::Duration::from_secs(2)).await; //tokio::time::delay_for(std::time::Duration::from_secs(2)).await;
client.get_verification(&start.content.transaction_id).await client.get_verification(&start.content.transaction_id).await
}, },
Message::SetVerification, Message::SetVerification,
@ -722,6 +767,48 @@ impl Application for Retrix {
view.sync_token = token; view.sync_token = token;
} }
}, },
Message::BackFill(id) => {
let room = view.rooms.get(&id).unwrap();
let client = view.client.clone();
let token = match room.messages.end.clone() {
Some(end) => end,
None => client
.get_joined_room(&id)
.unwrap()
.last_prev_batch()
.unwrap_or_else(|| view.sync_token.clone()),
};
return async move {
let request = MessageRequest::backward(&id, &token);
match client.room_messages(request).await {
Ok(response) => Message::BackFilled(id, response),
Err(e) => Message::ErrorMessage(e.to_string()),
}
}
.into();
}
Message::BackFilled(id, response) => {
let room = view.rooms.get_mut(&id).unwrap();
let events: Vec<AnyRoomEvent> = response
.chunk
.into_iter()
.filter_map(|e| e.deserialize().ok())
.chain(
response
.state
.into_iter()
.filter_map(|e| e.deserialize().ok().map(AnyRoomEvent::State)),
)
.collect();
if let Some(start) = response.start {
room.messages.start = Some(start);
}
if let Some(end) = response.end {
room.messages.end = Some(end);
}
room.messages.append(events);
}
Message::SetVerification(v) => view.sas = v, Message::SetVerification(v) => view.sas = v,
Message::VerificationAccept => { Message::VerificationAccept => {
let sas = match &view.sas { let sas = match &view.sas {