mastodon_api/
streaming.rs1use crate::error::{MastodonError, Result};
2use crate::models::{Notification, Status};
3use futures_util::StreamExt;
4use serde::Deserialize;
5use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
6use url::Url;
7
8#[derive(Debug, Deserialize)]
10#[serde(tag = "event", content = "payload")]
11pub enum StreamEvent {
12 #[serde(rename = "update")]
14 Update(String), #[serde(rename = "notification")]
17 Notification(String), #[serde(rename = "delete")]
20 Delete(String), #[serde(rename = "filters_changed")]
23 FiltersChanged,
24}
25
26#[derive(Debug)]
28pub enum MastodonEvent {
29 Update(Box<Status>),
31 Notification(Box<Notification>),
33 Delete(String),
35 FiltersChanged,
37}
38
39pub struct StreamingClient {
41 stream_url: String,
42 access_token: Option<String>,
43}
44
45impl StreamingClient {
46 pub fn new(base_url: &str, access_token: Option<String>) -> Self {
47 let stream_url = base_url
48 .replace("https://", "wss://")
49 .replace("http://", "ws://")
50 + "/api/v1/streaming";
51 Self {
52 stream_url,
53 access_token,
54 }
55 }
56
57 pub async fn subscribe(
67 &self,
68 stream_type: &str,
69 ) -> Result<std::pin::Pin<Box<dyn futures_util::Stream<Item = Result<MastodonEvent>> + Send>>>
70 {
71 let mut url = Url::parse(&self.stream_url)?;
72 url.query_pairs_mut().append_pair("stream", stream_type);
73 if let Some(token) = &self.access_token {
74 url.query_pairs_mut().append_pair("access_token", token);
75 }
76
77 let (ws_stream, _) = connect_async(url.to_string())
78 .await
79 .map_err(|e| MastodonError::Custom(e.to_string()))?;
80 let (_, read) = ws_stream.split();
81
82 Ok(Box::pin(read.filter_map(|msg| async {
83 match msg {
84 Ok(Message::Text(text)) => {
85 let event: StreamEvent = match serde_json::from_str(&text) {
86 Ok(e) => e,
87 Err(_) => return None,
88 };
89
90 match event {
91 StreamEvent::Update(payload) => {
92 let status: Status = serde_json::from_str(&payload).ok()?;
93 Some(Ok(MastodonEvent::Update(Box::new(status))))
94 }
95 StreamEvent::Notification(payload) => {
96 let notification: Notification = serde_json::from_str(&payload).ok()?;
97 Some(Ok(MastodonEvent::Notification(Box::new(notification))))
98 }
99 StreamEvent::Delete(id) => Some(Ok(MastodonEvent::Delete(id))),
100 StreamEvent::FiltersChanged => Some(Ok(MastodonEvent::FiltersChanged)),
101 }
102 }
103 _ => None,
104 }
105 })))
106 }
107}