Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reintegrate datagen #83

Merged
merged 10 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/dev-env-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ jobs:

# development only
- run: pip install jurigged
- run: pip install pytest
- run: apt-get update && apt-get install -y procps
- run: git config --global user.email "mail@ruicampos.org"
- run: git config --global user.name "Rui Campos"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/text-classification-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ jobs:
- name: Login to prefect
run: yes | prefect cloud login --key ${{ secrets.PREFECT_API_KEY }} --workspace 'digital-defiance/default'

- run: pytest pipelines/text_classification/datagen.py -vvv
- run: python -m pytest pipelines/text_classification/data.py -vvv

7 changes: 3 additions & 4 deletions pipelines/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
from typing import Callable

def shell_task(command_factory: Callable) -> Task:
@task
@wraps(command_factory)
def prefect_task(*args, shell_env = {}, **kwargs):
@task(name = command_factory.__name__)
async def prefect_task(*args, shell_env = {}, **kwargs):
shell_command = command_factory(*args, **kwargs)
shell_operation = ShellOperation(commands=[shell_command], env = shell_env)
return shell_operation.run()
return await shell_operation.run()
return prefect_task
Empty file.
2 changes: 1 addition & 1 deletion pipelines/text_classification/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@

DEFAULT_ATTENTION_MECHANISM: AttentionMechanisms = "quadratic"
SAVE_PATH: str = "output.safetensors"
DEV_RUST_BINARY: str = "/__w/llm-voice-chat/llm-voice-chat/target/debug/llm-voice-chat"

DEV_RUST_BINARY: str = "/__w/llm-voice-chat/llm-voice-chat/target/debug/llm-voice-chat"

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@
from numpy.random import default_rng
from prefect import get_run_logger, task, flow
import tiktoken
from inputs import Model
from pipelines.text_classification.inputs import Model

from safetensors import torch as stt
from typing import Literal
from functools import wraps
from torch.nn.utils.rnn import pad_sequence
from torch import tensor

import pytest





Sentiment = Literal["pos", "neg"]


Expand All @@ -22,6 +27,9 @@ def encode_text(text: str) -> list[int]:
return ENCODER.encode(text.lower())





def execute(func):

@task(name=func.__name__)
Expand All @@ -40,10 +48,13 @@ def prefect_task(conn: duckdb.DuckDBPyConnection, *args, **kwargs) -> list:
sql_cmd, sql_args = func(*args, **kwargs)
logger.debug("Executing command: %", sql_cmd)

print(sql_cmd, sql_args)
return conn.executemany(sql_cmd, sql_args)
return prefect_task





@execute
def create_dataset_table(source: str):
return f"""
Expand Down Expand Up @@ -118,6 +129,7 @@ def raw_data_to_tensor(raw_sentiments, raw_reviews):
def prepare_slices(conn, rng, epochs: int, number_of_partions: int, data_source: str, folder: str, name_prefix = ""):



logger = get_run_logger()

logger.info(f"Initializing table for {folder}")
Expand Down
98 changes: 64 additions & 34 deletions pipelines/text_classification/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,63 @@


from pipelines.text_classification.run_rust import run_rust_binary, make_rust_executable, download_rust_binary
from datagen import prepare_validation_slice, write_training_slices, write_test_slices

from pipelines.text_classification.data import prepare_slices
from pipelines.text_classification.constants import DEV_RUST_BINARY
from typing import Literal
import mlflow
from pydantic_settings import BaseSettings
from prefect import flow, task
from constants import DEV_RUST_BINARY
from prefect import flow, task, get_run_logger
from pipelines.text_classification.inputs import Data, Train, Settings, Model, TrainingProcess, MLFLowSettings
from anyio import run
import duckdb
from pipelines.commons import shell_task

from numpy.random import default_rng


@task
def log_params():
async def log_params(settings: Settings):
settings.to_env()
mlflow.log_params({
**TrainingProcess().model_dump(),
**Model().model_dump(),
**Train().model_dump(),
**Data().model_dump(),
})





@shell_task
def clean_tmp():
return "rm -rf tmp"
def remove_folder(folder: str):
return f"rm -rf {folder}"

@shell_task
def create_tmp():
return "mkdir -p tmp"
def create_folder(folder: str):
return f"mkdir -p {folder}"



@flow
def main(
process: TrainingProcess,
model: Model,
train: Train,
data: Data,
async def main(
process: TrainingProcess = TrainingProcess(),
model: Model = Model(),
train: Train = Train(),
data: Data = Data(),
experiment_id: int = 1,
run_name: str | None = None,
):
):
# logger = get_run_logger()

for folder in ["test", "train"]:
await remove_folder(folder)
await create_folder(folder)



with mlflow.start_run(run_name=run_name, experiment_id=experiment_id) as run:

Settings(
settings = Settings(
process = process,
model = model,
train = train,
Expand All @@ -58,39 +70,57 @@ def main(
run_name=run_name,
mlflow_run_id=run.info.run_id
)
).to_env()

log_params.submit()

clean_tmp()

create_tmp()
)

prepare_validation_slice.submit()
settings.to_env()
await log_params.submit(settings)

path_to_rust_binary = DEV_RUST_BINARY
if process.executable_source != DEV_RUST_BINARY:
path_to_rust_binary = "./train"
download_rust_binary(process.executable_source, path_to_rust_binary)
make_rust_executable(path_to_rust_binary)


training_slices = write_training_slices.submit()

training_loop = run_rust_binary.submit(
training_loop = await run_rust_binary.submit(
path_to_rust_binary,
shell_env = { key: value for key, value in Settings.from_env().yield_flattened_items() }
)

training_slices.wait()
write_test_slices.submit()
training_loop.wait()
with duckdb.connect() as conn:
flow_rng = default_rng(seed=42)
prepare_slices(
conn,
flow_rng,
train.epochs,
data.slices,
data.train_source,
"train"
)

with duckdb.connect() as conn:
flow_rng = default_rng(seed=42)
prepare_slices(
conn,
flow_rng,
1,
data.slices,
data.test_source,
"test"
)

await training_loop.wait()



if __name__ == "__main__":
class EnvironmentSettings(BaseSettings):
llmvc_environment: Literal["production", "staging", "development"] = "production"

main.serve(
name = f"text-classification-{EnvironmentSettings().llmvc_environment}"
)
if EnvironmentSettings().llmvc_environment == "development":
run(main)
else:
main.serve(
name = f"text-classification-{EnvironmentSettings().llmvc_environment}"
)

21 changes: 11 additions & 10 deletions pipelines/text_classification/inputs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
from constants import DEV_RUST_BINARY, DEFAULT_ATTENTION_MECHANISM
from pipelines.text_classification.constants import DEV_RUST_BINARY, DEFAULT_ATTENTION_MECHANISM
from pydantic_settings import BaseSettings
from typing import Literal
import os
import torch.cuda

from constants import SourceExecutable, AttentionMechanisms
from pipelines.text_classification.constants import SourceExecutable, AttentionMechanisms
from pydantic import model_validator

class Data(BaseSettings):
train_source: str = "https://github.com/Digital-Defiance/IMBd-dataset/raw/main/dataset/train.parquet"
test_source: str = "https://github.com/Digital-Defiance/IMBd-dataset/raw/main/dataset/test.parquet"
slices: int = 10
batch_size: int = 64
slices: int = 1
batch_size: int = 32

class TrainingProcess(BaseSettings):
use_gpu: bool = True
use_gpu: bool = torch.cuda.is_available()
executable_source: SourceExecutable = DEV_RUST_BINARY


class Train(BaseSettings):
epochs: int = 10
epochs: int = 40
learning_rate: float = 1e-4


class Model(BaseSettings):
encoding: Literal["tiktoken-gpt2"] = "tiktoken-gpt2"
attention_kind: AttentionMechanisms = DEFAULT_ATTENTION_MECHANISM
dimension: int = 64
depth: int = 3
heads: int = 3
attention_kind: AttentionMechanisms = DEFAULT_ATTENTION_MECHANISM
dimension: int = 300
depth: int = 5
heads: int = 6
context_window: int = 300
input_vocabolary: int = 60_000
output_vocabolary: int = 5
Expand Down
4 changes: 2 additions & 2 deletions pipelines/text_classification/run_rust.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

from ..commons import shell_task
from pipelines.commons import shell_task



@shell_task
Expand All @@ -15,4 +16,3 @@ def run_rust_binary(path_to_rust_binary: str):
shell_command = path_to_rust_binary
return shell_command


2 changes: 1 addition & 1 deletion src/attention/avg_pooling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Module for AvgPooling {
DEFAULT_CEIL_MODE,
DEFAULT_COUNT_INCLUDE_PAD,
DEFAULT_DIVISOR_OVERRIDE,
)
) - x_bcd
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/attention/quadratic_form.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
use tch::nn;
use tch::Tensor;


pub fn generate_init() -> nn::Init {
nn::Init::Randn { mean: 0., stdev: 1. }
}
use crate::metaformer::commons::generate_init;

#[derive(Debug)]
pub struct QuadraticAttention {
Expand Down
4 changes: 1 addition & 3 deletions src/attention/scaled_dot_product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
use tch::nn;
use tch::Tensor;

pub fn generate_init() -> nn::Init {
nn::Init::Randn { mean: 0., stdev: 1. }
}
use crate::metaformer::commons::generate_init;



Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl Cli {
print!("Current training device: CUDA");
match cuda {
Device::Cuda(_) => cuda,
_ => todo!(),
_ => panic!("Invalid device specification. Did you mean CPU ?"),
}
} else if self.use_gpu == "False" {
print!("Current training device: CPU");
Expand Down
4 changes: 2 additions & 2 deletions src/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ fn wait(path: &Path) {
}


pub fn read_dataslice(global_idx: i64) -> std::collections::HashMap<String, tch::Tensor> {
pub fn read_dataslice(folder: &str, global_idx: i64) -> std::collections::HashMap<String, tch::Tensor> {
println!("Reading file...");
let path = format!("tmp/{}_output.safetensors", global_idx);
let path = format!("{}/{}_output.safetensors", folder, global_idx);
let path_to_slice = std::path::Path::new(&path);
wait(path_to_slice);
let dataslice = tch::Tensor::read_safetensors(path_to_slice).unwrap();
Expand Down
10 changes: 4 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ use tch;
use tch::nn;
use config::Cli;

const EVAL_SLICE_IDX: i64 = 0;


/// Implementation of gradient descent
fn main() {
Expand All @@ -45,16 +43,16 @@ fn main() {

for train_step in 1..(config.slices*config.epochs + 1) {
let avg_train_loss = model.perform_train_step(&config, training_device, train_step, &mut opt);
let mut metrics: Vec<Metric> = model.perform_eval(&config, EVAL_SLICE_IDX, train_step);
metrics.push(avg_train_loss);
log_metrics(&config, metrics);
// let mut metrics: Vec<Metric> = model.perform_eval(&config, EVAL_SLICE_IDX, train_step);
// metrics.push(avg_train_loss);
log_metrics(&config, vec![avg_train_loss]);
}


let displacement = 5 + config.slices*config.epochs;
for test_idx in 1..(config.slices + 1) {
let step = test_idx + displacement;
let metrics: Vec<Metric> = model.perform_eval(&config, -test_idx, step);
let metrics: Vec<Metric> = model.perform_eval(&config, test_idx, step);
log_metrics(&config, metrics);
}

Expand Down
2 changes: 1 addition & 1 deletion src/metaformer/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ use tch::nn;


pub fn generate_init() -> nn::Init {
nn::Init::Randn { mean: 0., stdev: 1. }
tch::nn::init::DEFAULT_KAIMING_UNIFORM
}
Loading