Skip to content

Commit

Permalink
feat: add adaptivity option in perf test (#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gun9niR authored Apr 29, 2024
1 parent 5958b3d commit 6cc3287
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
4 changes: 3 additions & 1 deletion optd-perftest/src/cardtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ pub async fn cardtest_core<P: AsRef<Path>>(
pguser: &str,
pgpassword: &str,
benchmark: Benchmark,
adaptive: bool,
) -> anyhow::Result<HashMap<String, Vec<Cardinfo>>> {
let pg_dbms = Box::new(PostgresDBMS::build(&workspace_dpath, pguser, pgpassword)?);
let truecard_getter = pg_dbms.clone();
let df_dbms = Box::new(DatafusionDBMS::new(&workspace_dpath, rebuild_cached_optd_stats).await?);
let df_dbms =
Box::new(DatafusionDBMS::new(&workspace_dpath, rebuild_cached_optd_stats, adaptive).await?);
let dbmss: Vec<Box<dyn CardtestRunnerDBMSHelper>> = vec![pg_dbms, df_dbms];

let mut cardtest_runner = CardtestRunner::new(dbmss, truecard_getter).await?;
Expand Down
14 changes: 9 additions & 5 deletions optd-perftest/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use regex::Regex;
pub struct DatafusionDBMS {
workspace_dpath: PathBuf,
rebuild_cached_stats: bool,
adaptive: bool,
ctx: SessionContext,
}

Expand Down Expand Up @@ -71,11 +72,13 @@ impl DatafusionDBMS {
pub async fn new<P: AsRef<Path>>(
workspace_dpath: P,
rebuild_cached_stats: bool,
adaptive: bool,
) -> anyhow::Result<Self> {
Ok(DatafusionDBMS {
workspace_dpath: workspace_dpath.as_ref().to_path_buf(),
rebuild_cached_stats,
ctx: Self::new_session_ctx(None).await?,
adaptive,
ctx: Self::new_session_ctx(None, adaptive).await?,
})
}

Expand All @@ -85,12 +88,13 @@ impl DatafusionDBMS {
/// A more ideal way to generate statistics would be to use the `ANALYZE`
/// command in SQL, but DataFusion does not support that yet.
async fn clear_state(&mut self, stats: Option<DataFusionBaseTableStats>) -> anyhow::Result<()> {
self.ctx = Self::new_session_ctx(stats).await?;
self.ctx = Self::new_session_ctx(stats, self.adaptive).await?;
Ok(())
}

async fn new_session_ctx(
stats: Option<DataFusionBaseTableStats>,
adaptive: bool,
) -> anyhow::Result<SessionContext> {
let session_config = SessionConfig::from_env()?.with_information_schema(true);
let rn_config = RuntimeConfig::new();
Expand All @@ -101,7 +105,7 @@ impl DatafusionDBMS {
let optimizer: DatafusionOptimizer = DatafusionOptimizer::new_physical(
Arc::new(DatafusionCatalog::new(state.catalog_list())),
stats.unwrap_or_default(),
true,
adaptive,
);
state = state.with_physical_optimizer_rules(vec![]);
state = state.with_query_planner(Arc::new(OptdQueryPlanner::new(optimizer)));
Expand Down Expand Up @@ -355,7 +359,7 @@ impl DatafusionDBMS {
tpch_kit.gen_tables(tpch_kit_config)?;

// To get the schema of each table.
let ctx = Self::new_session_ctx(None).await?;
let ctx = Self::new_session_ctx(None, self.adaptive).await?;
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
let ddls = ddls
.split(';')
Expand Down Expand Up @@ -415,7 +419,7 @@ impl DatafusionDBMS {
job_kit.download_tables(job_kit_config)?;

// To get the schema of each table.
let ctx = Self::new_session_ctx(None).await?;
let ctx = Self::new_session_ctx(None, self.adaptive).await?;
let ddls = fs::read_to_string(&job_kit.schema_fpath)?;
let ddls = ddls
.split(';')
Expand Down
9 changes: 9 additions & 0 deletions optd-perftest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ enum Commands {
// system will use the cache by default.
rebuild_cached_optd_stats: bool,

#[clap(long)]
#[clap(help = "Whether to enable adaptivity for optd")]
#[clap(default_value = "true")]
adaptive: bool,

#[clap(long)]
#[clap(default_value = "default_user")]
#[clap(help = "The name of a user with superuser privileges")]
Expand Down Expand Up @@ -93,6 +98,7 @@ async fn cardtest<P: AsRef<Path>>(
rebuild_cached_optd_stats: bool,
pguser: String,
pgpassword: String,
adaptive: bool,
) -> anyhow::Result<()> {
let query_ids = if query_ids.is_empty() {
Vec::from(match benchmark_name {
Expand Down Expand Up @@ -130,6 +136,7 @@ async fn cardtest<P: AsRef<Path>>(
&pguser,
&pgpassword,
benchmark,
adaptive,
)
.await?;

Expand Down Expand Up @@ -263,6 +270,7 @@ async fn main() -> anyhow::Result<()> {
rebuild_cached_optd_stats,
pguser,
pgpassword,
adaptive,
} => {
cardtest(
workspace_dpath,
Expand All @@ -273,6 +281,7 @@ async fn main() -> anyhow::Result<()> {
rebuild_cached_optd_stats,
pguser,
pgpassword,
adaptive,
)
.await
}
Expand Down

0 comments on commit 6cc3287

Please sign in to comment.