mastodon_api/
streaming.rs

1use crate::error::{MastodonError, Result};
2use crate::models::{Notification, Status};
3use futures_util::StreamExt;
4use serde::Deserialize;
5use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
6use url::Url;
7
8/// Raw event received from the Mastodon streaming API.
9#[derive(Debug, Deserialize)]
10#[serde(tag = "event", content = "payload")]
11pub enum StreamEvent {
12    /// A new status has been posted.
13    #[serde(rename = "update")]
14    Update(String), // JSON string of Status
15    /// A new notification has been received.
16    #[serde(rename = "notification")]
17    Notification(String), // JSON string of Notification
18    /// A status has been deleted.
19    #[serde(rename = "delete")]
20    Delete(String), // ID of deleted status
21    /// Content filters have changed.
22    #[serde(rename = "filters_changed")]
23    FiltersChanged,
24}
25
26/// A high-level, parsed event from the Mastodon stream.
27#[derive(Debug)]
28pub enum MastodonEvent {
29    /// A new status has been posted.
30    Update(Box<Status>),
31    /// A new notification (mention, follow, etc.) has been received.
32    Notification(Box<Notification>),
33    /// a status was deleted (contains the ID).
34    Delete(String),
35    /// Content filters were updated.
36    FiltersChanged,
37}
38
39/// Client for connecting to Mastodon's real-time streaming API via WebSockets.
40pub struct StreamingClient {
41    stream_url: String,
42    access_token: Option<String>,
43}
44
45impl StreamingClient {
46    pub fn new(base_url: &str, access_token: Option<String>) -> Self {
47        let stream_url = base_url
48            .replace("https://", "wss://")
49            .replace("http://", "ws://")
50            + "/api/v1/streaming";
51        Self {
52            stream_url,
53            access_token,
54        }
55    }
56
57    /// Subscribes to a specific stream type.
58    ///
59    /// # Common Stream Types
60    /// - `user`: Events related to the authenticated user.
61    /// - `public`: All public statuses.
62    /// - `direct`: Direct messages/conversations.
63    /// - `hashtag`: Statuses containing a specific tag.
64    ///
65    /// Returns a pinned stream of `MastodonEvent`s.
66    pub async fn subscribe(
67        &self,
68        stream_type: &str,
69    ) -> Result<std::pin::Pin<Box<dyn futures_util::Stream<Item = Result<MastodonEvent>> + Send>>>
70    {
71        let mut url = Url::parse(&self.stream_url)?;
72        url.query_pairs_mut().append_pair("stream", stream_type);
73        if let Some(token) = &self.access_token {
74            url.query_pairs_mut().append_pair("access_token", token);
75        }
76
77        let (ws_stream, _) = connect_async(url.to_string())
78            .await
79            .map_err(|e| MastodonError::Custom(e.to_string()))?;
80        let (_, read) = ws_stream.split();
81
82        Ok(Box::pin(read.filter_map(|msg| async {
83            match msg {
84                Ok(Message::Text(text)) => {
85                    let event: StreamEvent = match serde_json::from_str(&text) {
86                        Ok(e) => e,
87                        Err(_) => return None,
88                    };
89
90                    match event {
91                        StreamEvent::Update(payload) => {
92                            let status: Status = serde_json::from_str(&payload).ok()?;
93                            Some(Ok(MastodonEvent::Update(Box::new(status))))
94                        }
95                        StreamEvent::Notification(payload) => {
96                            let notification: Notification = serde_json::from_str(&payload).ok()?;
97                            Some(Ok(MastodonEvent::Notification(Box::new(notification))))
98                        }
99                        StreamEvent::Delete(id) => Some(Ok(MastodonEvent::Delete(id))),
100                        StreamEvent::FiltersChanged => Some(Ok(MastodonEvent::FiltersChanged)),
101                    }
102                }
103                _ => None,
104            }
105        })))
106    }
107}