Skip to content

Commit

Permalink
Merge pull request #794 from jelmer/update-deps
Browse files Browse the repository at this point in the history
Update deps
  • Loading branch information
jelmer committed Sep 13, 2024
2 parents 296b940 + 7f4e339 commit 91ec97c
Show file tree
Hide file tree
Showing 11 changed files with 477 additions and 729 deletions.
822 changes: 159 additions & 663 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ prometheus = { version = "0.13.4", features = ["reqwest"] }
[workspace.dependencies]
rslock = { default-features = false, version = "0.4.0" }
debian-changelog = "0.1.12"
debian-analyzer = { git = "https://salsa.debian.org/jelmer/lintian-brush" }
debian-analyzer = "0.158.11"
debversion = { version = "0.4" }
google-cloud-storage = { version = "0.20.0" }
google-cloud-auth = { version = "0.16.0" }
Expand Down
2 changes: 1 addition & 1 deletion publish/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ janitor = { version = "0.0.0", path = ".." }
log.workspace = true
minijinja = { version = "2", features = ["loader"] }
pyo3.workspace = true
redis = { workspace = true, features = ["tokio-comp", "json"] }
redis = { workspace = true, features = ["tokio-comp", "json", "connection-manager"] }
rslock = { workspace = true, default-features = false, features = ["tokio-comp"] }
reqwest.workspace = true
serde.workspace = true
Expand Down
238 changes: 238 additions & 0 deletions publish/src/bin/janitor-publish.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
use clap::Parser;
use janitor_publish::rate_limiter::{
FixedRateLimiter, NonRateLimiter, RateLimiter, SlowStartRateLimiter,
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use url::Url;

#[derive(Parser)]
struct Args {
/// Maximum number of open merge proposals per bucket.
#[clap(long)]
max_mps_per_bucket: Option<usize>,

/// Prometheus push gateway to export to.
#[clap(long)]
prometheus: Option<Url>,

/// Just do one pass over the queue, don't run as a daemon.
#[clap(long, conflicts_with = "no_auto_publish")]
once: bool,

/// Listen address
#[clap(long, default_value = "0.0.0.0")]
listen_address: std::net::IpAddr,

/// Listen port
#[clap(long, default_value = "9912")]
port: u16,

/// Seconds to wait in between publishing pending proposals
#[clap(long, default_value = "7200")]
interval: i64,

/// Do not create merge proposals automatically.
#[clap(long, conflicts_with = "once")]
no_auto_publish: bool,

/// Path to load configuration from.
#[clap(long, default_value = "janitor.conf")]
config: std::path::PathBuf,

/// Use slow start rate limiter.
#[clap(long)]
slowstart: bool,

/// Only publish chnages that were reviewed.
#[clap(long)]
reviewed_only: bool,

/// Limit number of pushes per cycle.
#[clap(long)]
push_limit: Option<i32>,

/// Require a binary diff when publishing merge requests.
#[clap(long)]
require_binary_diff: bool,

/// Maximum number of merge proposals to update per cycle.
#[clap(long)]
modify_mp_limit: Option<i32>,

/// External URL
#[clap(long)]
external_url: Option<Url>,

/// Print debugging info
#[clap(long)]
debug: bool,

/// Differ URL.
#[clap(long, default_value = "http://localhost:9920/")]
differ_url: Url,

#[clap(flatten)]
logging: janitor::logging::LoggingArgs,

/// Path to merge proposal templates
#[clap(long)]
template_env_path: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> Result<(), i32> {
let args = Args::parse();

args.logging.init();

let config = Box::new(janitor::config::read_file(&args.config).map_err(|e| {
log::error!("Failed to read config: {}", e);
1
})?);

let config: &'static _ = Box::leak(config);

let bucket_rate_limiter: std::sync::Arc<Mutex<Box<dyn RateLimiter>>> =
std::sync::Arc::new(std::sync::Mutex::new(if args.slowstart {
Box::new(SlowStartRateLimiter::new(args.max_mps_per_bucket))
} else if let Some(max_mps_per_bucket) = args.max_mps_per_bucket {
Box::new(FixedRateLimiter::new(max_mps_per_bucket))
} else {
Box::new(NonRateLimiter)
}));

let forge_rate_limiter = Arc::new(Mutex::new(HashMap::new()));

let vcs_managers = Box::new(janitor::vcs::get_vcs_managers_from_config(config));
let vcs_managers: &'static _ = Box::leak(vcs_managers);
let db = janitor::state::create_pool(config).await.map_err(|e| {
log::error!("Failed to create database pool: {}", e);
1
})?;

let redis_async_connection = if let Some(redis_location) = config.redis_location.as_ref() {
let client = redis::Client::open(redis_location.to_string()).map_err(|e| {
log::error!("Failed to create redis client: {}", e);
1
})?;

Some(
redis::aio::ConnectionManager::new(client)
.await
.map_err(|e| {
log::error!("Failed to create redis async connection: {}", e);
1
})?,
)
} else {
None
};

let lock_manager = config
.redis_location
.as_deref()
.map(|redis_location| rslock::LockManager::new(vec![redis_location]));

let publish_worker = Arc::new(Mutex::new(
janitor_publish::PublishWorker::new(
args.template_env_path,
args.external_url,
args.differ_url,
redis_async_connection.clone(),
lock_manager,
)
.await,
));

if args.once {
janitor_publish::publish_pending_ready(
db.clone(),
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
vcs_managers,
args.push_limit,
args.require_binary_diff,
)
.await
.map_err(|e| {
log::error!("Failed to publish pending proposals: {}", e);
1
})?;

if let Some(prometheus) = args.prometheus.as_ref() {
janitor::prometheus::push_to_gateway(
prometheus,
"janitor.publish",
maplit::hashmap! {},
prometheus::default_registry(),
)
.await
.unwrap();
}
} else {
tokio::spawn(janitor_publish::process_queue_loop(
db.clone(),
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
forge_rate_limiter.clone(),
vcs_managers,
chrono::Duration::seconds(args.interval),
!args.no_auto_publish,
args.push_limit,
args.modify_mp_limit,
args.require_binary_diff,
));

tokio::spawn(janitor_publish::refresh_bucket_mp_counts(
db.clone(),
bucket_rate_limiter.clone(),
));

tokio::spawn(janitor_publish::listen_to_runner(
db.clone(),
redis_async_connection.clone(),
config,
publish_worker.clone(),
bucket_rate_limiter.clone(),
vcs_managers,
args.require_binary_diff,
));

let app = janitor_publish::web::app(
publish_worker.clone(),
bucket_rate_limiter.clone(),
forge_rate_limiter.clone(),
vcs_managers,
db.clone(),
args.require_binary_diff,
args.modify_mp_limit,
args.push_limit,
redis_async_connection.clone(),
config,
);

// run it
let addr = SocketAddr::new(args.listen_address, args.port);
log::info!("listening on {}", addr);

let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
log::error!("Failed to bind listener: {}", e);
1
})?;
axum::serve(listener, app.into_make_service())
.await
.map_err(|e| {
log::error!("Server error: {}", e);
1
})?;
}

Ok(())
}
54 changes: 25 additions & 29 deletions publish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use breezyshim::RevisionId;
use chrono::{DateTime, Utc};
use janitor::config::Campaign;
use janitor::publish::Mode;
use janitor::vcs::VcsManager;
use janitor::vcs::{VcsManager, VcsType};
use reqwest::header::HeaderMap;
use serde::ser::SerializeStruct;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

pub mod publish_one;
pub mod rate_limiter;
Expand Down Expand Up @@ -218,7 +219,7 @@ pub struct PublishWorker {
pub template_env_path: Option<PathBuf>,
pub external_url: Option<url::Url>,
pub differ_url: url::Url,
pub redis: Option<redis::aio::MultiplexedConnection>,
pub redis: Option<redis::aio::ConnectionManager>,
pub lock_manager: Option<rslock::LockManager>,
}

Expand Down Expand Up @@ -270,7 +271,7 @@ async fn run_worker_process(
.stderr(std::process::Stdio::piped())
.spawn()?;

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncWriteExt;
p.stdin
.as_mut()
.unwrap()
Expand Down Expand Up @@ -311,18 +312,13 @@ async fn run_worker_process(
}

impl PublishWorker {
async fn new(
pub async fn new(
template_env_path: Option<PathBuf>,
external_url: Option<url::Url>,
differ_url: url::Url,
redis: Option<redis::Client>,
redis: Option<redis::aio::ConnectionManager>,
lock_manager: Option<rslock::LockManager>,
) -> Self {
let redis = if let Some(redis) = redis {
Some(redis.get_multiplexed_async_connection().await.unwrap())
} else {
None
};
Self {
template_env_path,
external_url,
Expand Down Expand Up @@ -569,13 +565,13 @@ fn get_merged_by_user_url(url: &url::Url, user: &str) -> Result<Option<url::Url>
}

pub async fn process_queue_loop(
db: &sqlx::PgPool,
redis: &redis::aio::MultiplexedConnection,
db: sqlx::PgPool,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: &PublishWorker,
bucket_rate_limiter: &mut dyn rate_limiter::RateLimiter,
forge_rate_limiter: &mut HashMap<Forge, chrono::DateTime<Utc>>,
vcs_managers: Vec<Box<dyn VcsManager>>,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
forge_rate_limiter: Arc<Mutex<HashMap<Forge, chrono::DateTime<Utc>>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
interval: chrono::Duration,
auto_publish: bool,
push_limit: Option<i32>,
Expand All @@ -586,32 +582,32 @@ pub async fn process_queue_loop(
}

pub async fn publish_pending_ready(
db: &sqlx::PgPool,
redis: &redis::aio::MultiplexedConnection,
db: sqlx::PgPool,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: &PublishWorker,
bucket_rate_limiter: &mut dyn rate_limiter::RateLimiter,
vcs_managers: Vec<Box<dyn VcsManager>>,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
push_limit: Option<i32>,
require_binary_diff: bool,
) {
) -> Result<(), PublishError> {
todo!();
}

pub async fn refresh_bucket_mp_counts(
db: &sqlx::PgPool,
bucket_rate_limiter: &mut dyn rate_limiter::RateLimiter,
db: sqlx::PgPool,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
) {
todo!();
}

pub async fn listen_to_runner(
db: &sqlx::PgPool,
redis: &redis::aio::MultiplexedConnection,
db: sqlx::PgPool,
redis: Option<redis::aio::ConnectionManager>,
config: &janitor::config::Config,
publish_worker: &PublishWorker,
bucket_rate_limiter: &mut dyn rate_limiter::RateLimiter,
vcs_managers: Vec<Box<dyn VcsManager>>,
publish_worker: Arc<Mutex<PublishWorker>>,
bucket_rate_limiter: Arc<Mutex<Box<dyn rate_limiter::RateLimiter>>>,
vcs_managers: &HashMap<VcsType, Box<dyn VcsManager>>,
require_binary_diff: bool,
) {
todo!();
Expand Down
Loading

0 comments on commit 91ec97c

Please sign in to comment.