Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ tokio-socks = "0.5.2"
tokio-util = "0.7.15"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"


[features]
default = []
integration-test = []
71 changes: 70 additions & 1 deletion src/indexer/tracker_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use r2d2::Pool;
use tokio::{sync::mpsc::Sender, time::Instant};

use bitcoincore_rpc::bitcoin::absolute::{Height, LockTime};
use std::str::FromStr;
use tracing::info;

use super::rpc::BitcoinRpc;
Expand Down Expand Up @@ -38,7 +39,7 @@ pub async fn run(
continue;
}

if tx.output.len() < 2 {
if tx.output.len() < 2 || tx.output.len() > 5 {
continue;
}

Expand Down Expand Up @@ -96,13 +97,23 @@ fn extract_onion_address_from_script(script: &[u8]) -> Option<String> {

let data = &script[data_start..data_start + data_len];
let decoded = String::from_utf8(data.to_vec()).ok()?;

#[cfg(not(feature = "integration-test"))]
if is_valid_onion_address(&decoded) {
Some(decoded)
} else {
None
}

#[cfg(feature = "integration-test")]
if is_valid_address(&decoded) {
Some(decoded)
} else {
None
}
}

#[cfg(not(feature = "integration-test"))]
fn is_valid_onion_address(s: &str) -> bool {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() != 2 {
Expand All @@ -115,3 +126,61 @@ fn is_valid_onion_address(s: &str) -> bool {
}
matches!(port.parse::<u16>(), Ok(p) if p > 0)
}

#[cfg(feature = "integration-test")]
fn is_valid_address(s: &str) -> bool {
let parts: Vec<&str> = s.split(':').collect();
if parts.len() != 2 {
return false;
}

let ip = parts[0];
let port = parts[1];

if std::net::Ipv4Addr::from_str(ip).is_err() {
return false;
}

matches!(port.parse::<u16>(), Ok(p) if p > 0)
}

#[cfg(not(feature = "integration-test"))]
#[cfg(test)]
mod tests_onion {
use super::*;

#[test]
fn test_valid_onion_address() {
assert!(is_valid_onion_address("example.onion:1234"));
assert!(is_valid_onion_address("abc1234567890def.onion:65535"));
}

#[test]
fn test_invalid_onion_address() {
assert!(!is_valid_onion_address("example.com:1234"));
assert!(!is_valid_onion_address("example.onion:0"));
assert!(!is_valid_onion_address("example.onion"));
assert!(!is_valid_onion_address("127.0.0.1:8080"));
}
}

#[cfg(feature = "integration-test")]
#[cfg(test)]
mod tests_ipv4 {
use super::*;

#[test]
fn test_valid_ipv4_address() {
assert!(is_valid_address("127.0.0.1:8080"));
assert!(is_valid_address("192.168.1.1:65535"));
}

#[test]
fn test_invalid_ipv4_address() {
assert!(!is_valid_address("example.onion:1234"));
assert!(!is_valid_address("256.0.0.1:8080"));
assert!(!is_valid_address("127.0.0.1:0"));
assert!(!is_valid_address("127.0.0.1"));
assert!(!is_valid_address("::1:8080"));
}
}
20 changes: 19 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod utils;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();

#[cfg(not(feature = "integration-test"))]
#[derive(Debug, Clone)]
pub struct Config {
pub rpc_url: String,
Expand All @@ -35,6 +36,15 @@ pub struct Config {
pub datadir: String,
}

#[cfg(feature = "integration-test")]
#[derive(Debug, Clone)]
pub struct Config {
pub rpc_url: String,
pub rpc_auth: Auth,
pub address: String,
pub datadir: String,
}

fn run_migrations(pool: Arc<Pool<ConnectionManager<SqliteConnection>>>) {
let mut conn = pool
.get()
Expand All @@ -58,10 +68,12 @@ pub async fn start(cfg: Config) {
run_migrations(pool.clone());
info!("Connected to indexer db");

#[cfg(not(feature = "integration-test"))]
tor::check_tor_status(cfg.control_port, &cfg.tor_auth_password)
.await
.expect("Failed to check Tor status");

#[cfg(not(feature = "integration-test"))]
let hostname = match cfg.address.split_once(':') {
Some((_, port)) => {
let port = port.parse::<u16>().expect("Invalid port in address");
Expand All @@ -80,6 +92,9 @@ pub async fn start(cfg: Config) {
}
};

#[cfg(feature = "integration-test")]
let hostname = cfg.address.clone();

info!("Tracker is listening at {}", hostname);

let (mut db_tx, db_rx) = mpsc::channel::<DbRequest>(10);
Expand All @@ -93,6 +108,7 @@ pub async fn start(cfg: Config) {
db_tx.clone(),
status_tx.clone(),
cfg.address.clone(),
#[cfg(not(feature = "integration-test"))]
cfg.socks_port,
hostname.clone(),
)
Expand Down Expand Up @@ -125,6 +141,7 @@ pub async fn start(cfg: Config) {
db_tx.clone(),
status_tx.clone(),
cfg.address.clone(),
#[cfg(not(feature = "integration-test"))]
cfg.socks_port,
hostname.clone(),
)
Expand Down Expand Up @@ -162,14 +179,15 @@ async fn spawn_server(
db_tx: tokio::sync::mpsc::Sender<DbRequest>,
status_tx: tokio::sync::mpsc::Sender<Status>,
address: String,
socks_port: u16,
#[cfg(not(feature = "integration-test"))] socks_port: u16,
hostname: String,
) {
info!("Spawning server instance");
tokio::spawn(server::run(
db_tx,
status::Sender::Server(status_tx),
address,
#[cfg(not(feature = "integration-test"))]
socks_port,
hostname,
));
Expand Down
9 changes: 9 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn main() {
(parts[0].to_string(), parts[1].to_string())
};

#[cfg(not(feature = "integration-test"))]
let cfg = Config {
rpc_url: args.rpc,
rpc_auth: Auth::UserPass(user, pass),
Expand All @@ -40,5 +41,13 @@ async fn main() {
datadir: args.datadir,
};

#[cfg(feature = "integration-test")]
let cfg = Config {
rpc_url: args.rpc,
rpc_auth: Auth::UserPass(user, pass),
address: args.address,
datadir: args.datadir,
};

start(cfg).await;
}
8 changes: 7 additions & 1 deletion src/server/tracker_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::time::Duration;

#[cfg(feature = "integration-test")]
use tokio::net::TcpStream;
use tokio::{
io::BufWriter,
sync::mpsc::Sender,
Expand All @@ -23,7 +25,7 @@ const COOLDOWN_PERIOD: u64 = 5;
pub async fn monitor_systems(
db_tx: Sender<DbRequest>,
status_tx: status::Sender,
socks_port: u16,
#[cfg(not(feature = "integration-test"))] socks_port: u16,
onion_address: String,
port: u16,
) -> Result<(), TrackerError> {
Expand All @@ -45,12 +47,16 @@ pub async fn monitor_systems(

let mut success = false;
for attempt in 1..=3 {
#[cfg(not(feature = "integration-test"))]
let connect_result = Socks5Stream::connect(
format!("127.0.0.1:{socks_port:?}").as_str(),
address.clone(),
)
.await;

#[cfg(feature = "integration-test")]
let connect_result = TcpStream::connect(address.clone()).await;

match connect_result {
Ok(mut stream) => {
success = true;
Expand Down
3 changes: 2 additions & 1 deletion src/server/tracker_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub async fn run(
db_tx: Sender<DbRequest>,
status_tx: status::Sender,
address: String,
socks_port: u16,
#[cfg(not(feature = "integration-test"))] socks_port: u16,
onion_address: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let port = address
Expand All @@ -31,6 +31,7 @@ pub async fn run(
tokio::spawn(monitor_systems(
db_tx.clone(),
status_tx.clone(),
#[cfg(not(feature = "integration-test"))]
socks_port,
onion_address,
port,
Expand Down