Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
563 changes: 411 additions & 152 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ reqwest = { version = "0.12", features = [
"http2",
"charset",
], default-features = false }
url = { version = "2", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
async-trait = "0.1"
Expand Down
52 changes: 37 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,26 @@ OPTIONS:

The feed configuration is passed as a TOML file.

### Feed
### Default Settings

The `default` section defines global settings that apply to all feeds:

| Field | Type | Required | Default | Description |
| -------------|:----:|:--------:|:--------:| ----------- |
| `url` | string | Yes | | URL to the RSS feed |
| `interval` | string | No | 60s | Specifies the time interval between checks. E.g. `10m`, `3h`, `1d`. |
| `retry_limit` | uint | No | 10 | Specifies the retries after certain errors. |
| `sink` | object | Yes | | Sink options |
| `sink` | object | No | | Default sink options to use for all feeds |

### Feeds

The `feeds` section is an array that contains individual feed configurations:

| Field | Type | Required | Default | Description |
| -------------|:----:|:--------:|:--------:| ----------- |
| `url` | string | Yes | | URL to the RSS feed |
| `interval` | string | No | From default | Override the default interval for this feed |
| `retry_limit` | uint | No | From default | Override the default retry limit for this feed |
| `sink` | object | No | From default | Override sink options for this feed |

### Discord Sink

Expand Down Expand Up @@ -125,21 +137,31 @@ Streams feed items in [NDJSON](https://en.wikipedia.org/wiki/JSON_streaming#Line
### Config Example

```TOML
# Default settings for all feeds
[default]
interval = "5m"
retry_limit = 10

# Default sink configuration
[default.sink]
type = "discord"
url = "https://discord.com/api/webhooks/84175.../OZdejNBCL1..."

# Feed 1
[feeds.github-blog]
[[feeds]]
url = "https://github.blog/all.atom"
interval = "10m"
retry_limit = 5
sink.type = "discord"
sink.url = "https://discord.com/api/webhooks/84175.../OZdejNBCL1..."
interval = "10m" # Override default interval
retry_limit = 5 # Override default retry limit

# Feed 2
[feeds.rust-blog]
[[feeds]]
url = "https://blog.rust-lang.org/feed.xml"
interval = "1m"

[feeds.rust-blog.sink]
type = "custom"
command = "bash"
arguments = ["-c", "cat - >> ./rust-blog.log"]
interval = "1m" # Override default interval
sink.type = "custom" # Override default sink
sink.command = "bash"
sink.arguments = ["-c", "cat - >> ./rust-blog.log"]

# Feed 3 - uses all default settings
[[feeds]]
url = "https://example.com/feed.xml"
```
31 changes: 22 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
use crate::{sink::SinkOptions, Result};

use std::{collections::HashMap, path::Path, time::Duration};
use std::{path::Path, time::Duration};

use serde::Deserialize;
use tokio::fs;
use url::Url;

const fn retry_limit_default() -> usize {
10
}

const fn interval_default() -> Duration {
Duration::from_secs(60)
}

#[derive(Debug, Deserialize)]
pub struct Config {
pub feeds: HashMap<String, Feed>,
pub default: Default,
pub feeds: Vec<Feed>,
}

impl Config {
Expand All @@ -20,15 +30,18 @@ impl Config {
}

#[derive(Debug, Deserialize)]
pub struct Feed {
pub url: String,
pub sink: SinkOptions,
#[serde(default, with = "humantime_serde")]
pub interval: Option<Duration>,
pub struct Default {
pub sink: Option<SinkOptions>,
#[serde(default = "interval_default", with = "humantime_serde")]
pub interval: Duration,
#[serde(default = "retry_limit_default")]
pub retry_limit: usize,
}

const fn retry_limit_default() -> usize {
10
#[derive(Debug, Deserialize)]
pub struct Feed {
pub url: Url,
pub sink: Option<SinkOptions>,
pub interval: Option<Duration>,
pub retry_limit: Option<usize>,
}
16 changes: 16 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
pub enum Error {
#[error("feed error: {0}")]
Feed(#[from] FeedError),
#[error("config error: {0}")]
Config(#[from] ConfigError),
#[error("sink error: {0}")]
Sink(String),
#[error("watcher error: {0}")]
Watcher(#[from] WatcherError),
#[error("json error: {0}")]
Json(#[from] serde_json::error::Error),
#[error("task error: {0}")]
Expand All @@ -29,3 +33,15 @@ pub enum FeedError {
#[error("html2text error: {0}")]
Html2Text(#[from] html2text::Error),
}

#[derive(thiserror::Error, Debug)]
pub enum ConfigError {
#[error("no sink configured")]
MissingSink,
}

#[derive(thiserror::Error, Debug)]
pub enum WatcherError {
#[error("watcher failed")]
Failed,
}
95 changes: 23 additions & 72 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@ mod feed;
mod sink;
mod watcher;

use crate::{
config::{Config, Feed},
watcher::Watcher,
};
use crate::config::Config;

use std::{
collections::HashMap,
env,
io::{stdout, IsTerminal},
path::PathBuf,
Expand All @@ -25,13 +21,10 @@ use reqwest::{
header::{self, HeaderMap, HeaderName, HeaderValue},
Client,
};
use tokio::{
signal::unix::{signal, SignalKind},
sync::broadcast,
task::JoinSet,
};
use tracing::{debug, error, info};
use tokio::signal::unix::{signal, SignalKind};
use tracing::{debug, error};
use tracing_subscriber::EnvFilter;
use watcher::WatcherCollection;

#[cfg(feature = "mimalloc")]
#[global_allocator]
Expand Down Expand Up @@ -154,20 +147,28 @@ async fn main() -> Result<()> {
}
};

let client = build_client()?;
let watchers = WatcherCollection::try_from(config)?;

let shutdown_handle = watchers.shutdown_handle();

let mut tasks = watch_feeds(config.feeds, client)?;
let mut task_failed = false;
while let Some(res) = tasks.join_next().await {
let abort = if let Ok(r) = res { r.is_err() } else { true };
if abort && !task_failed {
tasks.abort_all();
task_failed = true;
tokio::spawn(async move {
let mut sig_int = signal(SignalKind::interrupt()).unwrap();
let mut sig_term = signal(SignalKind::terminate()).unwrap();

tokio::select! {
_ = sig_int.recv() => {},
_ = sig_term.recv() => {},
};

debug!("received termination signal");

if let Err(e) = shutdown_handle.send(()) {
error!("error while sending shutdown signal: {e}");
}
}
});

if task_failed {
eprintln!("Terminate due to a faulty watcher");
if let Err(e) = watchers.wait().await {
error!("Error while waiting for watchers: {e}");
process::exit(1);
}

Expand Down Expand Up @@ -272,53 +273,3 @@ fn parse_env_filter(debug: bool, verbose: bool) -> EnvFilter {
(false, _, _) => EnvFilter::from_default_env(),
}
}

fn watch_feeds(feeds: HashMap<String, Feed>, client: Client) -> Result<JoinSet<Result<()>>> {
let mut tasks = JoinSet::new();

let (tx, _) = broadcast::channel(feeds.len());

for (name, config) in feeds.into_iter() {
let sink = config.sink.sink(&client)?;
let watcher = Watcher::new(
config.url,
sink,
config.interval,
client.clone(),
config.retry_limit,
)?;

let rx = tx.subscribe();

tasks.spawn(async move {
info!("starting watcher for \"{name}\"");

if let Err(err) = watcher.watch(rx).await {
error!(
feed = %name,
error = %err,
"shutting down watcher due to an error",
);
return Err(err);
}

Ok(())
});
}

tokio::spawn(async move {
let mut sig_int = signal(SignalKind::interrupt()).unwrap();
let mut sig_term = signal(SignalKind::terminate()).unwrap();

tokio::select! {
_ = sig_int.recv() => {},
_ = sig_term.recv() => {},
};

debug!("received termination signal");

tx.send(()).unwrap();
});

Ok(tasks)
}
9 changes: 3 additions & 6 deletions src/sink/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::Sink;

use async_trait::async_trait;
use chrono::{DateTime, FixedOffset};
use reqwest::{Client, IntoUrl, Url};
use reqwest::{Client, Url};
use serde::Serialize;
use tracing::debug;

Expand All @@ -24,11 +24,8 @@ pub struct Discord {
}

impl Discord {
pub fn new<T: IntoUrl>(url: T, client: Client) -> Result<Self> {
Ok(Self {
url: url.into_url()?,
client,
})
pub fn new(url: Url, client: Client) -> Result<Self> {
Ok(Self { url, client })
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ use crate::{feed::item::FeedItem, Result};

use self::{custom::Custom, discord::Discord, slack::Slack};

use std::fmt;

use async_trait::async_trait;
use reqwest::Client;
use serde::Deserialize;
use url::Url;

#[derive(Debug, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum SinkOptions {
Discord {
url: String,
url: Url,
},
Slack {
url: String,
url: Url,
},
Custom {
command: String,
Expand All @@ -41,12 +44,12 @@ impl SinkOptions {
}

#[async_trait]
pub trait Sink {
pub trait Sink: fmt::Debug + Send + Sync {
async fn push<'a, T>(&self, items: &'a [T]) -> Result<()>
where
T: FeedItem<'a>;

async fn shutdown(mut self) -> Result<()>;
async fn shutdown(self) -> Result<()>;
}

#[derive(Debug)]
Expand Down
9 changes: 3 additions & 6 deletions src/sink/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use super::Sink;

use async_trait::async_trait;
use reqwest::{Client, IntoUrl, Url};
use reqwest::{Client, Url};
use serde::Serialize;
use slack_bk::{
blocks::{Block, Context, ContextElement, Divider, Header, Section},
Expand All @@ -24,11 +24,8 @@ pub struct Slack {
}

impl Slack {
pub fn new<T: IntoUrl>(url: T, client: Client) -> Result<Self> {
Ok(Self {
url: url.into_url()?,
client,
})
pub fn new(url: Url, client: Client) -> Result<Self> {
Ok(Self { url, client })
}
}

Expand Down
Loading