From bd812c2bf6ee199e33c8375e9679dbdfe56bd514 Mon Sep 17 00:00:00 2001 From: Sunyanan Choochotkaew Date: Thu, 28 Mar 2024 21:19:11 +0900 Subject: [PATCH] update kepler-model-db link, serve by spec, add db CI, visualize power curve Signed-off-by: Sunyanan Choochotkaew --- .github/workflows/integration-test.yml | 37 ++--- .github/workflows/pr.yml | 29 +++- .github/workflows/unit-test.yml | 1 + Makefile | 4 +- README.md | 3 +- cmd/README.md | 2 +- cmd/cmd_plot.py | 111 +++++++++++++ cmd/main.py | 40 +++-- .../base/patch/patch-estimator-sidecar.yaml | 5 +- manifests/kepler/patch/patch-ci.yaml | 2 +- manifests/test/patch-estimator-sidecar.yaml | 4 +- src/estimate/estimator.py | 14 +- src/estimate/model/curvefit_model.py | 10 +- src/estimate/model/keras_model.py | 4 +- src/estimate/model/model.py | 17 +- src/estimate/model/scikit_model.py | 4 +- src/estimate/model/xgboost_model.py | 4 +- src/estimate/model_server_connector.py | 13 +- src/server/model_server.py | 147 +++++++++++++----- src/train/ec2_pipeline.py | 87 +++++++++++ src/train/exporter/validator.py | 5 + src/train/exporter/writer.py | 11 +- src/train/isolator/train_isolator.py | 8 +- src/train/offline_trainer.py | 4 +- src/train/online_trainer.py | 6 +- src/train/pipeline.py | 8 +- src/train/profiler/node_type_index.py | 14 +- src/train/specpower_pipeline.py | 127 +++++++++++++++ src/train/trainer/SVRRegressorTrainer/main.py | 3 +- src/util/__init__.py | 2 +- src/util/config.py | 21 ++- src/util/loader.py | 89 ++++++++--- src/util/saver.py | 3 + tests/README.md | 6 +- tests/e2e_test.sh | 44 ++++-- tests/estimator_model_request_test.py | 13 +- tests/estimator_model_test.py | 4 +- tests/estimator_power_request_test.py | 18 ++- tests/model_server_test.py | 9 +- tests/offline_trainer_test.py | 2 +- tests/pipeline_test.py | 22 ++- tests/trainer_test.py | 6 +- 42 files changed, 764 insertions(+), 199 deletions(-) create mode 100644 src/train/ec2_pipeline.py create mode 100644 src/train/specpower_pipeline.py diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml index 3fd9385f..72473ec6 100644 --- a/.github/workflows/integration-test.yml +++ b/.github/workflows/integration-test.yml @@ -22,6 +22,10 @@ on: description: 'Kepler image tag' required: true type: string + additional_opts: + description: 'additional deployment opts' + required: true + type: string env: BASE_IMAGE: ${{ inputs.image_repo }}/kepler_model_server_base:${{ inputs.image_tag }} @@ -65,45 +69,24 @@ jobs: curl -s "https://raw.githubusercontent.com/kubernetes-sigs/kustomize/master/hack/install_kustomize.sh" | bash chmod +x kustomize mv kustomize /usr/local/bin/ - - name: test deploying kepler with only estimator - run: | - make deploy - make e2e-test - make cleanup - env: - OPTS: "ESTIMATOR" - - name: test deploying kepler with only server - run: | - make deploy - make e2e-test - make cleanup - env: - OPTS: "SERVER" - - name: test deploying kepler with estimator and model server - run: | - make deploy - make e2e-test - make cleanup - env: - OPTS: "ESTIMATOR SERVER" - - name: test deploying dummy kepler with only estimator + - name: test deploying with only estimator run: | make deploy make e2e-test make cleanup env: - OPTS: "ESTIMATOR TEST" - - name: test deploying dummy kepler with only server + OPTS: "ESTIMATOR${{ inputs.additional_opts }}" + - name: test deploying with only server run: | make deploy make e2e-test make cleanup env: - OPTS: "SERVER TEST" - - name: test deploying dummy kepler with estimator and model server + OPTS: "SERVER${{ inputs.additional_opts }}" + - name: test deploying with estimator and model server run: | make deploy make e2e-test make cleanup env: - OPTS: "ESTIMATOR SERVER TEST" \ No newline at end of file + OPTS: "ESTIMATOR SERVER${{ inputs.additional_opts }}" \ No newline at end of file diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index e9094bf9..9215e9a1 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -165,7 +165,7 @@ jobs: image_tag: ${{ needs.check-branch.outputs.tag }} pipeline_name: std_v0.7 - integration-test: + integration-test-internal-only: needs: [check-secret, check-branch, check-change, base-image] if: always() uses: ./.github/workflows/integration-test.yml @@ -174,4 +174,29 @@ jobs: docker_secret: ${{ needs.check-secret.outputs.docker-secret }} image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }} image_tag: ${{ needs.check-branch.outputs.tag }} - kepler_tag: release-0.7.7 \ No newline at end of file + kepler_tag: release-0.7.7 + additional_opts: " TEST" + + integration-test-with-exporter: + needs: [check-secret, check-branch, check-change, base-image] + if: always() + uses: ./.github/workflows/integration-test.yml + with: + base_change: ${{ needs.check-change.outputs.base }} + docker_secret: ${{ needs.check-secret.outputs.docker-secret }} + image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }} + image_tag: ${{ needs.check-branch.outputs.tag }} + kepler_tag: release-0.7.7 + additional_opts: "" + + integration-test-with-exporter-and-db: + needs: [check-secret, check-branch, check-change, base-image] + if: always() + uses: ./.github/workflows/integration-test.yml + with: + base_change: ${{ needs.check-change.outputs.base }} + docker_secret: ${{ needs.check-secret.outputs.docker-secret }} + image_repo: ${{ vars.IMAGE_REPO || 'docker.io/library' }} + image_tag: ${{ needs.check-branch.outputs.tag }} + kepler_tag: release-0.7.7 + additional_opts: " DB" \ No newline at end of file diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml index 576d875c..cffde569 100644 --- a/.github/workflows/unit-test.yml +++ b/.github/workflows/unit-test.yml @@ -32,6 +32,7 @@ jobs: run: make test-pipeline - name: Test model server run: make test-model-server + timeout-minutes: 5 - name: Test estimator run: make test-estimator timeout-minutes: 5 diff --git a/Makefile b/Makefile index 5f6e4936..7688fd9d 100644 --- a/Makefile +++ b/Makefile @@ -48,8 +48,8 @@ test-estimator: run-estimator run-collector-client clean-estimator # test estimator --> model-server run-model-server: - $(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name model-server $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 5 && python3.8 src/server/model_server.py" - sleep 5 + $(CTR_CMD) run -d --platform linux/amd64 -e "MODEL_TOPURL=http://localhost:8110" -v ${MODEL_PATH}:/mnt/models -p 8100:8100 --name model-server $(TEST_IMAGE) /bin/bash -c "python3.8 tests/http_server.py & sleep 10 && python3.8 src/server/model_server.py" + while ! docker logs model-server | grep -q Serving; do echo "waiting for model-server to serve"; sleep 5; done run-estimator-client: $(CTR_CMD) exec model-server /bin/bash -c "python3.8 -u ./tests/estimator_model_request_test.py" diff --git a/README.md b/README.md index 71519b2a..e8d80460 100644 --- a/README.md +++ b/README.md @@ -75,8 +75,9 @@ Compatible version: python 3.8 |Test case|Command| |---|---| |[Training pipeline](./tests/README.md#pipeline)|python -u ./tests/pipeline_test.py| - |[Model server](./tests/README.md#estimator-model-request-to-model-server)|Terminal 1: python src/server/model_server.py
Terminal 2: python -u tests/estimator_model_request_test.py| + |[Model server](./tests/README.md#estimator-model-request-to-model-server)|Terminal 1: export MODEL_PATH=$(pwd)/tests/models;python src/server/model_server.py
Terminal 2: python -u tests/estimator_model_request_test.py| |[Estimator](./tests/README.md#estimator-power-request-from-collector)|Terminal 1: python src/estimate/estimator.py
Terminal 2: python -u tests/estimator_power_request_test.py| + |Estimator with Model Server|Terminal 1: export MODEL_PATH=$(pwd)/tests/models;python src/server/model_server.py
Terminal 2: export MODEL_SERVER_URL=http://localhost:8100;export MODEL_SERVER_ENABLE=true;python -u src/estimate/estimator.py
Terminal 3: python -u tests/estimator_power_request_test.py |[Offline Trainer](./tests/README.md#offline-trainer)|Terminal 1: python src/train/offline_trainer.py
Terminal 2: python -u tests/offline_trainer_test.py| For more test information, check [here](./tests/). diff --git a/cmd/README.md b/cmd/README.md index c3d8bd64..659de1d3 100644 --- a/cmd/README.md +++ b/cmd/README.md @@ -85,7 +85,7 @@ Use kepler model server function as a standalone docker container. 5.3. Plot prediction result on specific trainer model and feature group (`estimate`) ```bash - docker run --rm -v "$(pwd)/data":/data quay.io/sustainable_computing_io/kepler_model_server:v0.7 plot --target-data estimate -i output_kepler_query --model-name GradientBoostingRegressorTrainer_1 --feature-group BPFOnly + docker run --rm -v "$(pwd)/data":/data quay.io/sustainable_computing_io/kepler_model_server:v0.7 plot --target-data estimate -i output_kepler_query --model-name GradientBoostingRegressorTrainer_0 --feature-group BPFOnly ``` 5.4. Plot prediction error comparison among feature group and trainer model (`error`) diff --git a/cmd/cmd_plot.py b/cmd/cmd_plot.py index 040640a2..cfc60b1f 100644 --- a/cmd/cmd_plot.py +++ b/cmd/cmd_plot.py @@ -9,6 +9,11 @@ from util.prom_types import TIMESTAMP_COL from util import PowerSourceMap +from util.train_types import FeatureGroup, ModelOutputType, weight_support_trainers +from util.loader import load_metadata, load_scaler, get_model_group_path +from train.profiler.node_type_index import NodeTypeIndexCollection +from estimate import load_model +markers = ['o', 's', '^', 'v', '<', '>', 'p', 'P', '*', 'x', '+', '|', '_'] def ts_plot(data, cols, title, output_folder, name, labels=None, subtitles=None, ylabel=None): plot_height = 3 @@ -147,4 +152,110 @@ def metadata_plot(args, energy_source, metadata_df, output_folder, name): plt.legend(frameon=False) filename = os.path.join(output_folder, name + ".png") fig.savefig(filename) + plt.close() + +def power_curve_plot(args, data_path, energy_source, output_folder, name): + model_toppath = data_path + pipeline_name = args.pipeline_name + pipeline_path = os.path.join(model_toppath, pipeline_name) + node_collection = NodeTypeIndexCollection(pipeline_path) + all_node_types = sorted(list(node_collection.node_type_index.keys())) + output_type = ModelOutputType[args.output_type] + models, _, cpu_ms_max = _load_all_models(model_toppath=model_toppath, output_type=output_type, name=pipeline_name, node_types=all_node_types, energy_source=energy_source) + if len(models) > 0: + _plot_models(models, cpu_ms_max, energy_source, output_folder, name) + +def _get_model(model_toppath, trainer, model_node_type, output_type, name, energy_source): + feature_group = FeatureGroup.BPFOnly + model_name = "{}_{}".format(trainer, model_node_type) + group_path = get_model_group_path(model_toppath, output_type, feature_group, energy_source, name) + model_path = os.path.join(group_path, model_name) + model = load_model(model_path) + metadata = load_metadata(model_path) + if metadata is None: + return model, None, None + scaler = load_scaler(model_path) + cpu_ms_max = scaler.max_abs_[0] + return model, metadata, cpu_ms_max + +def _load_all_models(model_toppath, output_type, name, node_types, energy_source): + models_dict = dict() + metadata_dict = dict() + cpu_ms_max_dict = dict() + for model_node_type in node_types: + min_mae = None + for trainer in weight_support_trainers: + model, metadata, cpu_ms_max = _get_model(model_toppath, trainer, model_node_type, output_type=output_type, name=name, energy_source=energy_source) + if metadata is None: + continue + cpu_ms_max_dict[model_node_type] = cpu_ms_max + if min_mae is None or min_mae > metadata["mae"]: + min_mae = metadata["mae"] + models_dict[model_node_type], metadata_dict[model_node_type] = model, metadata + return models_dict, metadata_dict, cpu_ms_max_dict + +def _plot_models(models, cpu_ms_max, energy_source, output_folder, name, max_plot=15, cpu_time_bin_num=10, sample_num=20): + from util.train_types import BPF_FEATURES + import numpy as np + import pandas as pd + import seaborn as sns + sns.set_palette("Paired") + + import matplotlib.pyplot as plt + + main_feature_col = BPF_FEATURES[0] + predicted_col = { + "acpi": "default_platform_power", + "intel_rapl": "default_package_power" + } + + num_bins = len(cpu_ms_max)//cpu_time_bin_num + 1 + nobin = False + if num_bins == 1: + nobin = True + values = np.array(list(cpu_ms_max.values())) + _, bins = np.histogram(values, bins=num_bins) + bin_size = len(bins) + 1 if not nobin else 1 + data_with_prediction_list = [[] for _ in range(bin_size)] + + num_cols = min(3, bin_size) + + for node_type, model in models.items(): + # generate data from scaler + xs = np.column_stack((np.linspace(0, cpu_ms_max[node_type], sample_num), np.zeros(sample_num))) + data = pd.DataFrame(xs, columns=models[node_type].estimator.features) + _, data_with_prediction = model.append_prediction(data) + if nobin: + bin_index = 0 + else: + bin_index = np.digitize([cpu_ms_max[node_type]], bins)[0] + data_with_prediction_list[bin_index] += [(node_type, data_with_prediction)] + total_graphs = 0 + for data_with_predictions in data_with_prediction_list: + total_graphs += int(np.ceil(len(data_with_predictions) / max_plot)) + num_rows = int(np.ceil(total_graphs/num_cols)) + + fig, axes = plt.subplots(num_rows, num_cols, figsize=(int(6*num_cols), int(5*num_rows))) + axes_index = 0 + for data_with_predictions in data_with_prediction_list: + index = 0 + for data_with_prediction_index in data_with_predictions: + if num_rows == 1 and num_cols == 1: + ax = axes + else: + ax = axes[axes_index//num_cols][axes_index%num_cols] + node_type = data_with_prediction_index[0] + data_with_prediction = data_with_prediction_index[1] + sns.lineplot(data=data_with_prediction, x=main_feature_col, y=predicted_col[energy_source], label="type={}".format(node_type), marker=markers[index], ax=ax) + index += 1 + index = index % len(markers) + if index % max_plot == 0: + ax.set_ylabel("Predicted power (W)") + axes_index += 1 + if len(data_with_predictions) > 0: + ax.set_ylabel("Predicted power (W)") + axes_index += 1 + filename = os.path.join(output_folder, name + ".png") + plt.tight_layout() + fig.savefig(filename) plt.close() \ No newline at end of file diff --git a/cmd/main.py b/cmd/main.py index ade16e17..9daf0e99 100644 --- a/cmd/main.py +++ b/cmd/main.py @@ -18,8 +18,8 @@ from util.prom_types import metric_prefix as KEPLER_METRIC_PREFIX, prom_responses_to_results, TIMESTAMP_COL, feature_to_query, update_thirdparty_metrics, node_info_column from util.extract_types import get_expected_power_columns from util.train_types import ModelOutputType, FeatureGroups, is_single_source_feature_group, all_feature_groups, default_trainers -from util.loader import load_json, DEFAULT_PIPELINE, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_preprocess_folder, get_general_filename, load_machine_spec -from util.saver import save_json, save_csv, save_train_args, _pipeline_model_metadata_filename +from util.loader import default_train_output_pipeline, load_json, load_pipeline_metadata, get_pipeline_path, get_model_group_path, list_pipelines, list_model_names, load_metadata, load_csv, get_preprocess_folder, get_general_filename, load_machine_spec +from util.saver import save_json, save_csv, save_train_args, _pipeline_model_metadata_filename, _power_curve_filename from util.config import ERROR_KEY, model_toppath from util import get_valid_feature_group_from_queries, PowerSourceMap from train.prom.prom_query import _range_queries @@ -27,7 +27,7 @@ from train import load_class from train.profiler.node_type_index import NodeTypeIndexCollection, NodeTypeSpec, generate_spec -from cmd_plot import ts_plot, feature_power_plot, summary_plot, metadata_plot +from cmd_plot import ts_plot, feature_power_plot, summary_plot, metadata_plot, power_curve_plot from cmd_util import extract_time, save_query_results, get_validate_df, summary_validation, get_extractor, check_ot_fg, get_pipeline, assert_train, get_isolator, UTC_OFFSET_TIMEDELTA import threading @@ -215,7 +215,7 @@ def isolate(args): extracted_data, power_labels = extract(args) if extracted_data is None or power_labels is None: return None - pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name + pipeline_name = default_train_output_pipeline if not args.pipeline_name else args.pipeline_name isolator = get_isolator(data_path, args.isolator, args.profile, pipeline_name, args.target_hints, args.bg_hints, args.abs_pipeline_name) isolated_data = isolator.isolate(extracted_data, label_cols=power_labels, energy_source=args.energy_source) if args.output: @@ -247,7 +247,7 @@ def isolate_from_data(args): energy_components = PowerSourceMap[args.energy_source] extracted_data = load_csv(data_path, "extracted_" + args.input) power_columns = get_expected_power_columns(energy_components=energy_components) - pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name + pipeline_name = default_train_output_pipeline if not args.pipeline_name else args.pipeline_name isolator = get_isolator(data_path, args.isolator, args.profile, pipeline_name, args.target_hints, args.bg_hints, args.abs_pipeline_name) isolated_data = isolator.isolate(extracted_data, label_cols=power_columns, energy_source=args.energy_source) if args.output: @@ -365,7 +365,7 @@ def train(args): elif PROM_THIRDPARTY_METRICS != [""]: update_thirdparty_metrics(PROM_THIRDPARTY_METRICS) - pipeline_name = DEFAULT_PIPELINE + pipeline_name = default_train_output_pipeline if args.pipeline_name: pipeline_name = args.pipeline_name @@ -599,6 +599,8 @@ def estimate(args): - `estimate` passes all arguments to `estimate` function, and plots the predicted time series and correlation between usage and power metrics - `error` passes all arguments to `estimate` function, and plots the summary of prediction error - `metadata` plot pipeline metadata + - `curve_power` plot curve power +- --input : specify related path for pipeline metadata - --energy-source : specify target energy sources (use comma(,) as delimiter) - --extractor : specify extractor to get preprocessed data of AbsPower model linked to the input data - --isolator : specify isolator to get preprocessed data of DynPower model linked to the input data @@ -606,7 +608,7 @@ def estimate(args): """ def plot(args): - pipeline_name = DEFAULT_PIPELINE if not args.pipeline_name else args.pipeline_name + pipeline_name = default_train_output_pipeline if not args.pipeline_name else args.pipeline_name pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) if not args.target_data: print("must give target data via --target-data to plot.") @@ -691,9 +693,13 @@ def plot(args): elif args.target_data == "metadata": for energy_source in energy_sources: data_filename = _pipeline_model_metadata_filename(energy_source, ot.name) - pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) - model_metadata_df = load_pipeline_metadata(pipeline_path, energy_source, ot.name) + model_metadata_df = load_pipeline_metadata(args.input, energy_source, ot.name) metadata_plot(args, energy_source, model_metadata_df, output_folder, data_filename) + elif args.target_data == "power_curve": + for energy_source in energy_sources: + data_filename = _power_curve_filename(energy_source, ot.name) + model_metadata_df = load_pipeline_metadata(args.input, energy_source, ot.name) + power_curve_plot(args, data_path, energy_source, output_folder, data_filename) """ export @@ -709,6 +715,7 @@ def plot(args): - custom benchmark in json with `startTimeUTC` and `endTimeUTC` data - --collect-date : specify collection time manually in UTC - --input : specify kepler query response file (output of `query` function) - optional +- --zip : specify whether to zip pipeline """ def export(args): @@ -742,14 +749,18 @@ def export(args): pipeline_path = get_pipeline_path(data_path, pipeline_name=pipeline_name) local_export_path = exporter.export(data_path, pipeline_path, output_path, publisher=args.publisher, collect_date=collect_date, inputs=inputs) - args.target_data = "metadata" + args.input = local_export_path args.output = local_export_path - args.output_type = "AbsPower" args.energy_source = ",".join(PowerSourceMap.keys()) - plot(args) - args.output_type = "DynPower" - plot(args) + for target_data in ["metadata", "power_curve"]: + for ot in ModelOutputType: + args.target_data = target_data + args.output_type = ot.name + plot(args) + if args.zip: + import shutil + shutil.make_archive(local_export_path, 'zip', local_export_path) """ plot_scenario @@ -886,6 +897,7 @@ def plot_scenario(args): parser.add_argument("--publisher", type=str, help="Specify github account of model publisher") parser.add_argument("--include-raw", type=bool, help="Include raw query data") parser.add_argument("--collect-date", type=str, help="Specify collect date directly") + parser.add_argument("--zip", type=bool, help="Specify whether to zip pipeline", default=False) parser.add_argument("--id", type=str, help="specify machine id") diff --git a/manifests/base/patch/patch-estimator-sidecar.yaml b/manifests/base/patch/patch-estimator-sidecar.yaml index bea2ac6b..270af8bd 100644 --- a/manifests/base/patch/patch-estimator-sidecar.yaml +++ b/manifests/base/patch/patch-estimator-sidecar.yaml @@ -1,3 +1,4 @@ +# TODO: change to official path when kepler-model-db#22 merged apiVersion: v1 kind: ConfigMap metadata: @@ -6,9 +7,9 @@ metadata: data: MODEL_CONFIG: | NODE_COMPONENTS_ESTIMATOR=true - NODE_COMPONENTS_INIT_URL=https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.6/nx12/std_v0.6/rapl/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip + NODE_COMPONENTS_INIT_URL=https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.7/ec2/intel_rapl/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_0.zip NODE_TOTAL_ESTIMATOR=true - NODE_TOTAL_INIT_URL=https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.6/nx12/std_v0.6/acpi/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip + NODE_TOTAL_INIT_URL=https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.7/specpower/acpi/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_0.zip --- apiVersion: apps/v1 kind: DaemonSet diff --git a/manifests/kepler/patch/patch-ci.yaml b/manifests/kepler/patch/patch-ci.yaml index 206a639f..3ae1f510 100644 --- a/manifests/kepler/patch/patch-ci.yaml +++ b/manifests/kepler/patch/patch-ci.yaml @@ -4,7 +4,7 @@ metadata: name: kepler-cfm namespace: system data: - KEPLER_LOG_LEVEL: '3' + KEPLER_LOG_LEVEL: '4' --- apiVersion: apps/v1 kind: DaemonSet diff --git a/manifests/test/patch-estimator-sidecar.yaml b/manifests/test/patch-estimator-sidecar.yaml index d93e9883..6edb9b24 100644 --- a/manifests/test/patch-estimator-sidecar.yaml +++ b/manifests/test/patch-estimator-sidecar.yaml @@ -1,6 +1,6 @@ data: MODEL_CONFIG: | NODE_COMPONENTS_ESTIMATOR=true - NODE_COMPONENTS_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/intel_rapl/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip + NODE_COMPONENTS_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/intel_rapl/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_0.zip NODE_TOTAL_ESTIMATOR=true - NODE_TOTAL_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/acpi/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_1.zip \ No newline at end of file + NODE_TOTAL_INIT_URL=http://model-db.kepler.svc.cluster.local:8110/std_v0.7/acpi/AbsPower/BPFOnly/GradientBoostingRegressorTrainer_0.zip \ No newline at end of file diff --git a/src/estimate/estimator.py b/src/estimate/estimator.py index fc464df6..cadaac26 100644 --- a/src/estimate/estimator.py +++ b/src/estimate/estimator.py @@ -61,7 +61,14 @@ def handle_request(data): if output_type.name not in loaded_model: loaded_model[output_type.name] = dict() output_path = "" - if power_request.energy_source not in loaded_model[output_type.name]: + request_trainer = True + if power_request.trainer_name is not None: + if output_type.name in loaded_model and power_request.energy_source in loaded_model[output_type.name]: + current_trainer = loaded_model[output_type.name][power_request.energy_source].trainer_name + request_trainer = current_trainer == power_request.trainer_name + if not request_trainer: + print("try obtaining the requesting trainer {} (current: {})".format(power_request.trainer_name, current_trainer)) + if power_request.energy_source not in loaded_model[output_type.name] or not request_trainer: output_path = get_download_output_path(download_path, power_request.energy_source, output_type) if not os.path.exists(output_path): # try connecting to model server @@ -77,7 +84,10 @@ def handle_request(data): print("load model from config: ", output_path) else: print("load model from model server: ", output_path) - loaded_model[output_type.name][power_request.energy_source] = load_downloaded_model(power_request.energy_source, output_type) + loaded_item = load_downloaded_model(power_request.energy_source, output_type) + if loaded_item is not None and loaded_item.estimator is not None: + loaded_model[output_type.name][power_request.energy_source] = loaded_item + print("set model {0} for {2} ({1})".format(loaded_item.model_name, output_type.name, power_request.energy_source)) # remove loaded model shutil.rmtree(output_path) diff --git a/src/estimate/model/curvefit_model.py b/src/estimate/model/curvefit_model.py index 9e376074..68d260e3 100644 --- a/src/estimate/model/curvefit_model.py +++ b/src/estimate/model/curvefit_model.py @@ -11,11 +11,14 @@ src_path = os.path.join(os.path.dirname(__file__), '..', '..') sys.path.append(src_path) +train_path = os.path.join(os.path.dirname(__file__), '..', '..', 'train') +sys.path.append(train_path) + from util import ModelOutputType import collections.abc -class CurveFitModel(): +class CurveFitModelEstimator(): def __init__(self, model_path, model_name, output_type, model_file, features, fe_files, component_init=False, feature_group=None): self.name = model_name self.features = features @@ -30,9 +33,10 @@ def __init__(self, model_path, model_name, output_type, model_file, features, fe self.models = dict() model_info = load_model_by_json(model_path, model_file) for comp, model_metadata in model_info.items(): - model = CurveFitModel(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) + model = CurveFitModelEstimator(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) feature_index = main_feature(self.feauture_group.name, comp) - model.model.set_feature_index(feature_index) + if model.model is not None: + model.model.set_feature_index(feature_index) self.models[comp] = model else: self.model = load_model_by_pickle(model_path, model_file) diff --git a/src/estimate/model/keras_model.py b/src/estimate/model/keras_model.py index 077e973c..148b5538 100644 --- a/src/estimate/model/keras_model.py +++ b/src/estimate/model/keras_model.py @@ -10,7 +10,7 @@ from model_server_connector import ModelOutputType from estimate_common import load_model_by_pickle, load_model_by_keras, load_model_by_json, transform_and_predict, is_component_model -class KerasModel(): +class KerasModelEstimator(): def __init__(self, model_path, model_name, output_type, model_file, features, fe_files, component_init=False): self.name = model_name self.features = features @@ -20,7 +20,7 @@ def __init__(self, model_path, model_name, output_type, model_file, features, fe self.models = dict() model_info = load_model_by_json(model_path, model_file) for comp, model_metadata in model_info.items(): - model = KerasModel(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) + model = KerasModelEstimator(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) self.models[comp] = model else: self.model = load_model_by_keras(model_path, model_file) diff --git a/src/estimate/model/model.py b/src/estimate/model/model.py index d2a9f766..06b9c9a6 100644 --- a/src/estimate/model/model.py +++ b/src/estimate/model/model.py @@ -14,17 +14,17 @@ from config import download_path from prom_types import TIMESTAMP_COL, valid_container_query -from scikit_model import ScikitModel -from xgboost_model import XgboostModel -from curvefit_model import CurveFitModel -# from keras_model import KerasModel +from scikit_model import ScikitModelEstimator +from xgboost_model import XgboostModelEstimator +from curvefit_model import CurveFitModelEstimator +# from keras_model import KerasModelEstimator # model wrapper MODELCLASS = { - 'scikit': ScikitModel, - 'xgboost': XgboostModel, - 'curvefit': CurveFitModel - # 'keras': KerasModel, + 'scikit': ScikitModelEstimator, + 'xgboost': XgboostModelEstimator, + 'curvefit': CurveFitModelEstimator + # 'keras': KerasModelEstimator, } def default_predicted_col_func(energy_component): @@ -64,6 +64,7 @@ def __init__(self, model_path, model_class, model_name, output_type, model_file, abs_model=None, abs_mae=None, abs_mae_val=None, abs_mse=None, abs_mse_val=None, abs_max_corr=None, \ reconstructed_mae=None, reconstructed_mse=None, avg_mae=None, **kwargs): self.model_name = model_name + self.trainer_name = model_name.split("_")[0] self.estimator = MODELCLASS[model_class](model_path, model_name, output_type, model_file, features, fe_files) self.mae = mae self.mape = mape diff --git a/src/estimate/model/scikit_model.py b/src/estimate/model/scikit_model.py index 8e34b6d1..4bb6ed69 100644 --- a/src/estimate/model/scikit_model.py +++ b/src/estimate/model/scikit_model.py @@ -14,7 +14,7 @@ import collections.abc -class ScikitModel(): +class ScikitModelEstimator(): def __init__(self, model_path, model_name, output_type, model_file, features, fe_files, component_init=False): self.name = model_name self.features = features @@ -25,7 +25,7 @@ def __init__(self, model_path, model_name, output_type, model_file, features, fe self.models = dict() model_info = load_model_by_json(model_path, model_file) for comp, model_metadata in model_info.items(): - model = ScikitModel(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) + model = ScikitModelEstimator(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) self.models[comp] = model else: self.model = load_model_by_pickle(model_path, model_file) diff --git a/src/estimate/model/xgboost_model.py b/src/estimate/model/xgboost_model.py index 26255250..441e7c12 100644 --- a/src/estimate/model/xgboost_model.py +++ b/src/estimate/model/xgboost_model.py @@ -14,7 +14,7 @@ import collections.abc -class XgboostModel(): +class XgboostModelEstimator(): def __init__(self, model_path, model_name, output_type, model_file, features, fe_files, component_init=False): self.name = model_name self.features = features @@ -25,7 +25,7 @@ def __init__(self, model_path, model_name, output_type, model_file, features, fe self.models = dict() model_info = load_model_by_json(model_path, model_file) for comp, model_metadata in model_info.items(): - model = XgboostModel(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) + model = XgboostModelEstimator(model_path, self.name, self.output_type.name, model_metadata['model_file'], model_metadata['features'], model_metadata['fe_files'], component_init=True) self.models[comp] = model else: filepath = os.path.join(model_path, model_file) diff --git a/src/estimate/model_server_connector.py b/src/estimate/model_server_connector.py index 3759f7ca..ef130309 100644 --- a/src/estimate/model_server_connector.py +++ b/src/estimate/model_server_connector.py @@ -13,8 +13,19 @@ from loader import get_download_output_path from train_types import ModelOutputType +# discover_spec: determine node spec in json format (refer to NodeTypeSpec) +def discover_spec(): + import psutil + # TODO: reuse node_type_index/generate_spec with loosen selection + cores = psutil.cpu_count(logical=True) + spec = { + "cores": cores + } + return spec +node_spec = discover_spec() + def make_model_request(power_request): - return {"metrics": power_request.metrics + power_request.system_features, "output_type": power_request.output_type, "source": power_request.energy_source, "filter": power_request.filter, "trainer_name": power_request.trainer_name} + return {"metrics": power_request.metrics + power_request.system_features, "output_type": power_request.output_type, "source": power_request.energy_source, "filter": power_request.filter, "trainer_name": power_request.trainer_name, "spec": node_spec} TMP_FILE = 'tmp.zip' diff --git a/src/server/model_server.py b/src/server/model_server.py index 863695e6..af5bb72d 100644 --- a/src/server/model_server.py +++ b/src/server/model_server.py @@ -2,7 +2,6 @@ import os import sys -import logging import codecs import shutil import requests @@ -12,15 +11,17 @@ sys.path.append(src_path) sys.path.append(util_path) -from util.train_types import get_valid_feature_groups, ModelOutputType, FeatureGroups, FeatureGroup, weight_support_trainers -from util.config import getConfig, model_toppath, ERROR_KEY, MODEL_SERVER_MODEL_REQ_PATH, MODEL_SERVER_MODEL_LIST_PATH, initial_pipeline_url, download_path -from util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type +from util.train_types import get_valid_feature_groups, ModelOutputType, FeatureGroups, FeatureGroup, PowerSourceMap, weight_support_trainers +from util.config import getConfig, model_toppath, ERROR_KEY, MODEL_SERVER_MODEL_REQ_PATH, MODEL_SERVER_MODEL_LIST_PATH, initial_pipeline_urls, download_path +from util.loader import parse_filters, is_valid_model, load_json, load_weight, get_model_group_path, get_archived_file, METADATA_FILENAME, CHECKPOINT_FOLDERNAME, get_pipeline_path, any_node_type, is_matched_type, get_largest_candidates +from util.saver import WEIGHT_FILENAME +from train import NodeTypeSpec, NodeTypeIndexCollection ############################################### # model request class ModelRequest(): - def __init__(self, metrics, output_type, source='intel_rapl', node_type=-1, weight=False, trainer_name="", filter=""): + def __init__(self, metrics, output_type, source='intel_rapl', node_type=-1, weight=False, trainer_name="", filter="", pipeline_name="", spec=None): # target source of power metric to be predicted (e.g., intel_rapl, acpi) self.source = source # type of node to select a model learned from similar nodes (default: -1, applied universal model learned by all node_type (TODO)) @@ -35,6 +36,12 @@ def __init__(self, metrics, output_type, source='intel_rapl', node_type=-1, weig self.output_type = output_type # whether requesting just a linear regression weight or any model archive file (default: False, an archive file of any model) self.weight = weight + # specific pipeline (default: empty, selecting default pipeline) + self.pipeline_name = pipeline_name + # spec of requesting node to determine node_type + self.spec = NodeTypeSpec() + if spec is not None: + self.spec = NodeTypeSpec(**spec) ########################################### @@ -42,7 +49,23 @@ def __init__(self, metrics, output_type, source='intel_rapl', node_type=-1, weig MODEL_SERVER_PORT = getConfig('MODEL_SERVER_PORT', MODEL_SERVER_PORT) MODEL_SERVER_PORT = int(MODEL_SERVER_PORT) -def select_best_model(valid_groupath, filters, trainer_name="", node_type=any_node_type, weight=False): +# pipelineName and nodeCollection are global dict values set at initial state (load_init_pipeline) +## pipelineName: map of energy_source to target pipeline name +pipelineName = dict() +## nodeCollection: map of pipeline_name to its node_collection, used for determining covering node_type of requesting node spec +nodeCollection = dict() + +""" +select_best_model: +1. list model_names from valid_grouppath (determined by valid features) +2. filter weight-supported model if requesting for model weight +3. filter matched type by requesting node_type or node_collection over node spec +4. if no candidate left, list model with largest number of cores +5. if fail to list, use all models from step 2 +7. for each model, check validity and load +8. return the best model (lowest error) +""" +def select_best_model(spec, valid_groupath, filters, energy_source, pipeline_name="", trainer_name="", node_type=any_node_type, weight=False): model_names = [f for f in os.listdir(valid_groupath) if \ f != CHECKPOINT_FOLDERNAME \ and not os.path.isfile(os.path.join(valid_groupath,f)) \ @@ -52,23 +75,36 @@ def select_best_model(valid_groupath, filters, trainer_name="", node_type=any_no # Load metadata of trainers best_cadidate = None best_response = None + candidates = [] for model_name in model_names: - if not is_matched_type(model_name, node_type): + if not is_matched_type(nodeCollection, spec, pipeline_name, model_name, node_type, energy_source): continue + candidates += [model_name] + if len(model_names) > 0 and len(candidates) == 0: + # loosen all spec + candidates = get_largest_candidates(model_names, pipeline_name, nodeCollection, energy_source) + print("no matched models, select from large candidates: ", candidates) + if candidates is None: + print("no large candidates, select from all availables") + candidates = model_names + for model_name in candidates: model_savepath = os.path.join(valid_groupath, model_name) metadata = load_json(model_savepath, METADATA_FILENAME) if metadata is None or not is_valid_model(metadata, filters) or ERROR_KEY not in metadata: # invalid metadata + print("invalid", is_valid_model(metadata, filters), metadata) continue if weight: response = load_weight(model_savepath) if response is None: # fail to get weight file + print("weight failed", model_savepath) continue else: response = get_archived_file(valid_groupath, model_name) if not os.path.exists(response): # archived model file does not exists + print("archived failed", response) continue if best_cadidate is None or best_cadidate[ERROR_KEY] > metadata[ERROR_KEY]: best_cadidate = metadata @@ -77,7 +113,7 @@ def select_best_model(valid_groupath, filters, trainer_name="", node_type=any_no app = Flask(__name__) -# return archive file or LR weight based on request (req) +# get_model: return archive file or LR weight based on request (req) @app.route(MODEL_SERVER_MODEL_REQ_PATH, methods=['POST']) def get_model(): model_request = request.get_json() @@ -97,9 +133,9 @@ def get_model(): best_response = None # find best model comparing best candidate from each valid feature group complied with filtering conditions for fg in valid_fgs: - valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source) + valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source]) if os.path.exists(valid_groupath): - best_candidate, response = select_best_model(valid_groupath, filters, req.trainer_name, req.node_type, req.weight) + best_candidate, response = select_best_model(req.spec, valid_groupath, filters, energy_source, req.pipeline_name, req.trainer_name, req.node_type, req.weight) if best_candidate is None: continue if best_model is None or best_model[ERROR_KEY] > best_candidate[ERROR_KEY]: @@ -123,7 +159,7 @@ def get_model(): except ValueError as err: return make_response("send archived model error: {}".format(err), 400) -# return name list of best-candidate pipelines +# get_available_models: return name list of best-candidate pipelines @app.route(MODEL_SERVER_MODEL_LIST_PATH, methods=['GET']) def get_available_models(): fg = request.args.get('fg') @@ -154,9 +190,9 @@ def get_available_models(): for output_type in output_types: model_names[output_type.name] = dict() for fg in valid_fgs: - valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source) + valid_groupath = get_model_group_path(model_toppath, output_type, fg, energy_source, pipeline_name=pipelineName[energy_source]) if os.path.exists(valid_groupath): - best_candidate, _ = select_best_model(valid_groupath, filters) + best_candidate, _ = select_best_model(None, valid_groupath, filters, energy_source) if best_candidate is None: continue model_names[output_type.name][fg.name] = best_candidate['model_name'] @@ -169,34 +205,65 @@ def get_available_models(): except (ValueError, Exception) as err: return make_response("failed to get best model list: {}".format(err), 400) +# upack_zip_files: unpack all model.zip files to model folder and copy model.json to model/weight.zip +def unpack_zip_files(root_folder): + # Walk through all folders and subfolders + for folder, _, files in os.walk(root_folder): + for file in files: + if file.endswith('.zip'): + zip_file_path = os.path.join(folder, file) + extract_to = os.path.splitext(zip_file_path)[0] # Extract to same location as the ZIP file + # Make sure the destination folder exists, if not, create it + if not os.path.exists(extract_to): + os.makedirs(extract_to) + shutil.unpack_archive(zip_file_path, extract_to) + weight_file = os.path.join(folder, file.replace(".zip", ".json")) + if os.path.exists(weight_file): + shutil.copy(weight_file, os.path.join(extract_to, WEIGHT_FILENAME + ".json")) + +# set_pipelines: set global pipeline variables, nodeCollection and pipelineName +def set_pipelines(): + pipeline_names = [f for f in os.listdir(model_toppath) if os.path.exists(os.path.join(model_toppath, f, METADATA_FILENAME + ".json"))] + for pipeline_name in pipeline_names: + pipeline_path = get_pipeline_path(model_toppath, pipeline_name=pipeline_name) + global nodeCollection + nodeCollection[pipeline_name] = NodeTypeIndexCollection(pipeline_path) + print("initial pipeline is loaded to {}".format(pipeline_path)) + for energy_source in PowerSourceMap.keys(): + if os.path.exists(os.path.join(pipeline_path, energy_source)): + pipelineName[energy_source] = pipeline_name + print("set pipeline {} for {}".format(pipeline_name, energy_source)) + +# load_init_pipeline: load pipeline from URLs and set pipeline variables def load_init_pipeline(): - print("try downloading archieved pipeline from URL: {}".format(initial_pipeline_url)) - response = requests.get(initial_pipeline_url) - print(response) - if response.status_code != 200: - print("failed to download archieved pipeline.") - return - - # delete existing default pipeline - default_pipeline = get_pipeline_path(model_toppath) - if os.path.exists(default_pipeline): - shutil.rmtree(default_pipeline) - os.mkdir(default_pipeline) - - # unpack pipeline - try: - TMP_FILE = 'tmp.zip' - tmp_filepath = os.path.join(download_path, TMP_FILE) - with codecs.open(tmp_filepath, 'wb') as f: - f.write(response.content) - shutil.unpack_archive(tmp_filepath, default_pipeline) - except Exception as e: - print("failed to unpack downloaded pipeline: ", e) - return - - # remove downloaded zip - os.remove(tmp_filepath) - print("initial pipeline is loaded to {}".format(default_pipeline)) + for initial_pipeline_url in initial_pipeline_urls: + print("try downloading archieved pipeline from URL: {}".format(initial_pipeline_url)) + response = requests.get(initial_pipeline_url) + print(response) + if response.status_code != 200: + print("failed to download archieved pipeline.") + return + # delete existing default pipeline + basename = os.path.basename(initial_pipeline_url) + pipeline_name = basename.split(".zip")[0] + pipeline_path = get_pipeline_path(model_toppath, pipeline_name=pipeline_name) + if os.path.exists(pipeline_path): + shutil.rmtree(pipeline_path) + os.mkdir(pipeline_path) + # unpack pipeline + try: + filename = basename + tmp_filepath = os.path.join(download_path, filename) + with codecs.open(tmp_filepath, 'wb') as f: + f.write(response.content) + shutil.unpack_archive(tmp_filepath, pipeline_path) + unpack_zip_files(pipeline_path) + except Exception as e: + print("failed to unpack downloaded pipeline: ", e) + return + # remove downloaded zip + os.remove(tmp_filepath) + set_pipelines() if __name__ == '__main__': load_init_pipeline() diff --git a/src/train/ec2_pipeline.py b/src/train/ec2_pipeline.py new file mode 100644 index 00000000..6c8e8c05 --- /dev/null +++ b/src/train/ec2_pipeline.py @@ -0,0 +1,87 @@ +""" +python src/train/ec2_pipeline.py + +This program trains a pipeline called `ec2` from the collected data on AWS COS which is collected by kepler-model-training-playbook or by collect-data job on github workflow. + +required step: +- set AWS secret environments (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION) +""" + +import os +import sys +import json + +from profiler.node_type_index import NodeTypeSpec, NodeAttribute + +cur_path = os.path.join(os.path.dirname(__file__), '.') +sys.path.append(cur_path) +util_path = os.path.join(os.path.dirname(__file__), '..', 'util') +sys.path.append(util_path) +extractor_path = os.path.join(os.path.dirname(__file__), 'extractor') +sys.path.append(extractor_path) +isolator_path = os.path.join(os.path.dirname(__file__), 'isolator') +sys.path.append(isolator_path) +profiler_path = os.path.join(os.path.dirname(__file__), 'profiler') +sys.path.append(profiler_path) + +from pipeline import NewPipeline +from extractor import DefaultExtractor +from isolator import MinIdleIsolator +from prom_types import node_info_column, prom_responses_to_results, get_valid_feature_group_from_queries +from train_types import default_trainer_names, PowerSourceMap + +node_profiles = ["m5zn.metal", "c5d.metal", "i3en.metal", "m7i.metal-24xl", "i3.metal"] +node_image = "ami-0e4d0bb9670ea8db0" + +import boto3 + +aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"] +aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"] +region_name = os.environ["AWS_REGION"] + +# Initialize the S3 client +s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name=region_name) +bucket_name = 'kepler-power-model' +unknown = -1 +def read_response_in_json(key): + print(key) + response = s3.get_object(Bucket=bucket_name, Key=key) + return json.loads(response['Body'].read().decode('utf-8')) + +class Ec2PipelineRun(): + def __init__(self, name, abs_trainer_names=default_trainer_names, dyn_trainer_names=default_trainer_names, isolator=MinIdleIsolator()): + self.energy_source = "intel_rapl" + self.energy_components = PowerSourceMap[self.energy_source] + self.pipeline = NewPipeline(name, abs_trainer_names=abs_trainer_names, dyn_trainer_names=dyn_trainer_names, extractor=DefaultExtractor(), isolator=isolator, target_energy_sources=[self.energy_source]) + + def process(self): + for profile in node_profiles: + machine_id = "-".join([profile, node_image]) + kepler_query_key = os.path.join("/", machine_id, "data", "kepler_query.json") + spec_key = os.path.join("/", machine_id, "data", "machine_spec", machine_id + ".json") + query_response = read_response_in_json(kepler_query_key) + spec_json = read_response_in_json(spec_key) + spec = NodeTypeSpec(**spec_json['attrs']) + spec.attrs[NodeAttribute.PROCESSOR] = profile + spec.attrs[NodeAttribute.MEMORY] = unknown + spec.attrs[NodeAttribute.FREQ] = unknown + node_type = self.pipeline.node_collection.index_train_machine(machine_id, spec) + query_results = prom_responses_to_results(query_response) + query_results[node_info_column] = node_type + valid_fg = get_valid_feature_group_from_queries([query for query in query_response.keys() if len(query_response[query]) > 1 ]) + for feature_group in valid_fg: + self.pipeline.process(query_results, self.energy_components, self.energy_source, feature_group=feature_group.name, replace_node_type=node_type) + self.pipeline.node_collection.save() + + def save_metadata(self): + self.pipeline.save_metadata() + + def archive_pipeline(self): + self.pipeline.archive_pipeline() + +if __name__ == "__main__": + pipeline_name = "ec2" + pipelinerun = Ec2PipelineRun(name=pipeline_name) + pipelinerun.process() + pipelinerun.save_metadata() + pipelinerun.archive_pipeline() \ No newline at end of file diff --git a/src/train/exporter/validator.py b/src/train/exporter/validator.py index 3a3d4092..8ca064c8 100644 --- a/src/train/exporter/validator.py +++ b/src/train/exporter/validator.py @@ -13,8 +13,11 @@ from loader import load_pipeline_metadata, get_model_group_path, load_weight, get_archived_file from saver import save_json +# mae and mape are exclusive thresholds to balance between absolute error value and relative error value mae_threshold = 10 mape_threshold = 20 +# hard_threshold is a hard threshold to avoid too-small value on absolute error and too-large value on relative error +hard_threshold = 100 class ExportModel(): def __init__(self, models_path, output_type, feature_group, energy_source, pipeline_name, model_name, metadata): @@ -99,6 +102,8 @@ def get_validated_export_items(pipeline_path, pipeline_name): continue valid_rows = [] for _, row in metadata_df.iterrows(): + if row['mae'] > hard_threshold or row['mape'] > hard_threshold: + continue if row['mape'] <= mape_threshold or row['mae'] <= mae_threshold: model_name = row["model_name"] fg = FeatureGroup[row["feature_group"]] diff --git a/src/train/exporter/writer.py b/src/train/exporter/writer.py index bff14b5c..b830070a 100644 --- a/src/train/exporter/writer.py +++ b/src/train/exporter/writer.py @@ -7,7 +7,7 @@ sys.path.append(util_path) from loader import load_json, version -from saver import assure_path, _pipeline_model_metadata_filename +from saver import assure_path, _pipeline_model_metadata_filename, _power_curve_filename from validator import mae_threshold, mape_threshold from train_types import ModelOutputType, PowerSourceMap @@ -220,6 +220,15 @@ def generate_pipeline_readme(pipeline_name, local_export_path, node_type_index_j markdown_content += "![]({}.png)\n".format(data_filename) markdown_content += data_to_markdown_table(df.sort_values(by=["node type"])) + # add power curve figures + for ot in ModelOutputType: + for energy_source in PowerSourceMap.keys(): + data_filename = _power_curve_filename(energy_source, ot.name) + png_filename = data_filename + ".png" + if os.path.exists(os.path.join(local_export_path, png_filename)): + markdown_content += "## {} ({})\n".format(energy_source, ot.name) + markdown_content += "![]({})\n".format(png_filename) + write_markdown(markdown_filepath, markdown_content) return markdown_filepath diff --git a/src/train/isolator/train_isolator.py b/src/train/isolator/train_isolator.py index 2525ca89..3af23b1e 100644 --- a/src/train/isolator/train_isolator.py +++ b/src/train/isolator/train_isolator.py @@ -18,7 +18,7 @@ from util.prom_types import TIMESTAMP_COL, get_container_name_from_id from util.extract_types import container_level_index, container_id_colname, col_to_component from util.config import model_toppath -from util.loader import list_all_abs_models, DEFAULT_PIPELINE +from util.loader import list_all_abs_models, default_train_output_pipeline def is_better(curr_min_err, err, curr_max_corr, corr, corr_threshold=0.7): if curr_min_err is None: @@ -33,7 +33,7 @@ def is_better(curr_min_err, err, curr_max_corr, corr, corr_threshold=0.7): return err < curr_min_err return False -def get_abs_models(workload_feature_cols, energy_source, toppath=model_toppath, pipeline_name=DEFAULT_PIPELINE): +def get_abs_models(workload_feature_cols, energy_source, toppath=model_toppath, pipeline_name=default_train_output_pipeline): # from abs_model_path # find valid_feature_groups # list_model_names @@ -81,7 +81,7 @@ def get_target_data_with_dyn_power(model, energy_components, extracted_power_lab return target_data_with_dyn_power, sum_background_data_with_prediction # traverse all abs_model with minimum mae for each energy_source -def find_best_target_data_with_dyn_power(energy_source, energy_components, extracted_data, background_containers, label_cols, toppath=model_toppath, pipeline_name=DEFAULT_PIPELINE): +def find_best_target_data_with_dyn_power(energy_source, energy_components, extracted_data, background_containers, label_cols, toppath=model_toppath, pipeline_name=default_train_output_pipeline): workload_feature_cols = [col for col in extracted_data.columns if col not in label_cols and col not in container_level_index and 'ratio' not in col and 'node' not in col] curr_min_err = None curr_max_corr= None @@ -126,7 +126,7 @@ def get_background_container_from_bg_hints(data, bg_hints): # TO-DO: suppport multiple node types class TrainIsolator(Isolator): - def __init__(self, idle_data=None, profiler=None, target_hints=[], bg_hints=[], abs_pipeline_name=DEFAULT_PIPELINE): + def __init__(self, idle_data=None, profiler=None, target_hints=[], bg_hints=[], abs_pipeline_name=default_train_output_pipeline): if profiler is not None and idle_data is not None: self.idle_data = idle_data self.profiles = profiler.process(self.idle_data) diff --git a/src/train/offline_trainer.py b/src/train/offline_trainer.py index caa9e1b6..380dae9f 100644 --- a/src/train/offline_trainer.py +++ b/src/train/offline_trainer.py @@ -26,7 +26,7 @@ sys.path.append(isolator_path) from config import model_toppath -from loader import get_pipeline_path, DEFAULT_PIPELINE +from loader import get_pipeline_path, default_pipelines from train_types import PowerSourceMap from prom_types import get_valid_feature_group_from_queries, prom_responses_to_results from profiler.profiler import Profiler, generate_profiles @@ -81,7 +81,7 @@ def init_isolator(self, profiler, profiles, idle_data): elif isolator_key == TrainIsolator.__name__: if 'abs_pipeline_name' not in isolator_args: # use default pipeline for absolute model training in isolation - isolator_args['abs_pipeline_name'] = DEFAULT_PIPELINE + isolator_args['abs_pipeline_name'] = default_pipelines[self.energy_source] isolator = TrainIsolator(idle_data, profiler=profiler, abs_pipeline_name=isolator_args['abs_pipeline_name']) else: module_path = importlib.import_module('isolator') diff --git a/src/train/online_trainer.py b/src/train/online_trainer.py index 31900ec4..fa3bdc3b 100644 --- a/src/train/online_trainer.py +++ b/src/train/online_trainer.py @@ -15,7 +15,7 @@ from prom_query import PrometheusClient from prom_types import get_valid_feature_group_from_queries, PROM_QUERY_INTERVAL from config import getConfig -from loader import DEFAULT_PIPELINE +from loader import default_train_output_pipeline SAMPLING_INTERVAL = PROM_QUERY_INTERVAL SAMPLING_INTERVAL = getConfig('SAMPLING_INTERVAL', SAMPLING_INTERVAL) @@ -36,8 +36,8 @@ def initial_pipelines(): target_energy_sources = PowerSourceMap.keys() valid_feature_groups = FeatureGroups.keys() profiles = load_all_profiles() - profile_pipeline = NewPipeline(DEFAULT_PIPELINE, abs_trainer_names, dyn_trainer_names, extractor=DefaultExtractor(), isolator=ProfileBackgroundIsolator(profiles), target_energy_sources=target_energy_sources, valid_feature_groups=valid_feature_groups) - non_profile_pipeline = NewPipeline(DEFAULT_PIPELINE, abs_trainer_names, dyn_trainer_names, extractor=DefaultExtractor(), isolator=MinIdleIsolator(), target_energy_sources=target_energy_sources, valid_feature_groups=valid_feature_groups) + profile_pipeline = NewPipeline(default_train_output_pipeline, abs_trainer_names, dyn_trainer_names, extractor=DefaultExtractor(), isolator=ProfileBackgroundIsolator(profiles), target_energy_sources=target_energy_sources, valid_feature_groups=valid_feature_groups) + non_profile_pipeline = NewPipeline(default_train_output_pipeline, abs_trainer_names, dyn_trainer_names, extractor=DefaultExtractor(), isolator=MinIdleIsolator(), target_energy_sources=target_energy_sources, valid_feature_groups=valid_feature_groups) return profile_pipeline, non_profile_pipeline if __name__ == '__main__': diff --git a/src/train/pipeline.py b/src/train/pipeline.py index 60874dcb..6fc1c508 100644 --- a/src/train/pipeline.py +++ b/src/train/pipeline.py @@ -121,10 +121,8 @@ def _train(self, abs_data, dyn_data, power_labels, energy_source, feature_group) futures = [] for trainer in self.trainers: if trainer.feature_group_name != feature_group: - print("Skip feature: ", feature_group) continue if trainer.energy_source != energy_source: - print("Skip energy source: ", energy_source) continue if trainer.node_level and abs_data is not None: future = executor.submit(run_train, trainer, abs_data, power_labels, pipeline_lock=self.lock) @@ -134,6 +132,12 @@ def _train(self, abs_data, dyn_data, power_labels, energy_source, feature_group) futures += [future] self.print_log('Waiting for {} trainers to complete...'.format(len(futures))) wait(futures) + # Handle exceptions if any + for future in futures: + if future.exception() is not None: + # Handle the exception here + print(f"Exception occurred: {future.exception()}") + self.print_log('{}/{} trainers are trained from {} to {}'.format(len(futures), len(self.trainers), feature_group, energy_source)) diff --git a/src/train/profiler/node_type_index.py b/src/train/profiler/node_type_index.py index 44b23661..35a38957 100644 --- a/src/train/profiler/node_type_index.py +++ b/src/train/profiler/node_type_index.py @@ -121,13 +121,23 @@ def add_member(self, machine_id): def get_size(self): return len(self.members) + def get_cores(self): + return self.attrs[NodeAttribute.CORES] + # check the comparing node-type spec is covered by this node-type spec def cover(self, compare_spec): if not isinstance(compare_spec, NodeTypeSpec): return False for attr in NodeAttribute: - if compare_spec.attrs[attr] is not None and str(self.attrs[attr]) != str(compare_spec.attrs[attr]): - return False + if compare_spec.attrs[attr] is not None: + try: + # Attempt to convert values to floats + if float(self.attrs[attr]) != float(compare_spec.attrs[attr]): + return False + except ValueError: + # If conversion to float fails, compare as strings + if self.attrs[attr] != compare_spec.attrs[attr]: + return False return True def __str__(self): diff --git a/src/train/specpower_pipeline.py b/src/train/specpower_pipeline.py new file mode 100644 index 00000000..1760bfc6 --- /dev/null +++ b/src/train/specpower_pipeline.py @@ -0,0 +1,127 @@ +""" +python src/train/specpower_pipeline.py + +This program trains a pipeline called `specpower` from the preprocssed data from SPECPower database. + +required step: +- run the following command to serve kepler_spec_power_db + docker run -it -p 8080:80 quay.io/sustainability/kepler_spec_power_db:v0.7 +""" + +import requests +from io import StringIO +import os +import sys +import pandas as pd +import json +import datetime + +from profiler.node_type_index import NodeTypeSpec + +cur_path = os.path.join(os.path.dirname(__file__), '.') +sys.path.append(cur_path) +util_path = os.path.join(os.path.dirname(__file__), '..', 'util') +sys.path.append(util_path) +extractor_path = os.path.join(os.path.dirname(__file__), 'extractor') +sys.path.append(extractor_path) +isolator_path = os.path.join(os.path.dirname(__file__), 'isolator') +sys.path.append(isolator_path) +profiler_path = os.path.join(os.path.dirname(__file__), 'profiler') +sys.path.append(profiler_path) + +from pipeline import NewPipeline +from extractor import DefaultExtractor +from isolator import MinIdleIsolator +from format import time_to_str +from prom_types import node_info_column, TIMESTAMP_COL +from extract_types import component_to_col +from train_types import FeatureGroup, default_trainer_names, BPF_FEATURES, PowerSourceMap + +platform_energy_source = "acpi" +acpi_component = PowerSourceMap[platform_energy_source][0] +acpi_label = component_to_col(acpi_component) + +def read_csv_from_url(topurl, path): + response = requests.get(os.path.join(topurl, path)) + if response.status_code == 200: + try: + csv_data = StringIO(response.text) + df = pd.read_csv(csv_data) + return df + except Exception as e: + print("cannot read csv from url: ", e) + return None + print("cannot read csv from url: ", response.status_code) + return None + +def read_json_from_url(topurl, path): + response = requests.get(os.path.join(topurl, path)) + if response.status_code == 200: + try: + json_data = json.loads(response.text) + return json_data + except Exception as e: + print("cannot read json from url: ", e) + return None + print("cannot read json from url: ", response.status_code) + return None + +def get_machine_spec(df): + if len(df) == 0: + return NodeTypeSpec() + data_spec = df.iloc[0].to_dict() + spec = NodeTypeSpec(**data_spec) + return spec + +class SpecPipelineRun(): + def __init__(self, name, abs_trainer_names=default_trainer_names, dyn_trainer_names=default_trainer_names, isolator=MinIdleIsolator()): + self.feature_group = FeatureGroup.BPFOnly + self.power_labels = [acpi_label] + self.energy_source = platform_energy_source + # extractor is not used + self.pipeline = NewPipeline(name, abs_trainer_names=abs_trainer_names, dyn_trainer_names=dyn_trainer_names, extractor=DefaultExtractor(), isolator=isolator, target_energy_sources=[self.energy_source], valid_feature_groups=[self.feature_group]) + + def load_spec_data(self, spec_db_url): + spec_extracted_data = dict() + # load index.json + file_indexes = read_json_from_url(spec_db_url, "index.json") + if file_indexes is not None: + for filename in file_indexes: + machine_id, _ = os.path.splitext(filename) + df = read_csv_from_url(spec_db_url, filename) + if df is not None: + # find node_type + spec = get_machine_spec(df) + node_type = self.pipeline.node_collection.index_train_machine(machine_id, spec) + df[node_info_column] = node_type + # select only needed column + spec_extracted_data[machine_id] = df[[TIMESTAMP_COL, node_info_column, acpi_label] + BPF_FEATURES ] + self.pipeline.node_collection.save() + return spec_extracted_data + + def process(self, spec_db_url): + spec_extracted_data = self.load_spec_data(spec_db_url) + abs_data = pd.concat(spec_extracted_data.values(), ignore_index=True) + df_list = [] + for df in spec_extracted_data.values(): + isolated_df = self.pipeline.isolator.isolate(df, label_cols=self.power_labels, energy_source=self.energy_source) + df_list += [isolated_df] + dyn_data = pd.concat(df_list, ignore_index=True) + self.pipeline._train(abs_data, dyn_data, self.power_labels, self.energy_source, self.feature_group.name) + self.pipeline.print_pipeline_process_end(self.energy_source, self.feature_group.name, abs_data, dyn_data) + self.pipeline.metadata["last_update_time"] = time_to_str(datetime.datetime.utcnow()) + return True, abs_data, dyn_data + + def save_metadata(self): + self.pipeline.save_metadata() + + def archive_pipeline(self): + self.pipeline.archive_pipeline() + +if __name__ == "__main__": + spec_db_url = "http://localhost:8080" + pipeline_name = "specpower" + pipelinerun = SpecPipelineRun(name=pipeline_name) + _, abs_data, dyn_data = pipelinerun.process(spec_db_url) + pipelinerun.save_metadata() + pipelinerun.archive_pipeline() \ No newline at end of file diff --git a/src/train/trainer/SVRRegressorTrainer/main.py b/src/train/trainer/SVRRegressorTrainer/main.py index e4f6dbd4..632a064c 100644 --- a/src/train/trainer/SVRRegressorTrainer/main.py +++ b/src/train/trainer/SVRRegressorTrainer/main.py @@ -16,5 +16,4 @@ def __init__(self, energy_components, feature_group, energy_source, node_level, self.fe_files = [] def init_model(self): - print("scaler:", self.node_scalers.keys()) - return make_pipeline(self.node_scalers[common_node_type], SVR(C=1.0, epsilon=0.2)) \ No newline at end of file + return SVR(C=1.0, epsilon=0.2) \ No newline at end of file diff --git a/src/util/__init__.py b/src/util/__init__.py index 0f7e00f0..0c570f75 100644 --- a/src/util/__init__.py +++ b/src/util/__init__.py @@ -4,7 +4,7 @@ sys.path.append(cur_path) # commonly-used definitions -from loader import load_json, load_csv, load_pkl, load_metadata, load_scaler, load_weight, load_remote_pkl, list_model_names, DEFAULT_PIPELINE, class_to_json, version +from loader import load_json, load_csv, load_pkl, load_metadata, load_scaler, load_weight, load_remote_pkl, list_model_names, default_train_output_pipeline, class_to_json, version from saver import assure_path, save_csv, save_json, save_pkl, save_metadata, save_scaler, save_weight from config import getConfig, model_toppath from prom_types import get_valid_feature_group_from_queries diff --git a/src/util/config.py b/src/util/config.py index c13a1e13..460f660d 100644 --- a/src/util/config.py +++ b/src/util/config.py @@ -13,8 +13,8 @@ ################################################# import os -from loader import get_url, get_pipeline_url, default_init_model_url -from train_types import ModelOutputType, is_support_output_type +from loader import get_url, get_pipeline_url, base_model_url, default_pipelines, default_train_output_pipeline +from train_types import ModelOutputType, is_support_output_type, FeatureGroup # must be writable (for shared volume mount) MNT_PATH = "/mnt" @@ -63,8 +63,15 @@ def getPath(subpath): CONFIG_PATH = getConfig('CONFIG_PATH', CONFIG_PATH) -model_topurl = getConfig('MODEL_TOPURL', default_init_model_url) -initial_pipeline_url = getConfig('INITIAL_PIPELINE_URL', get_pipeline_url(model_topurl=model_topurl)) +model_topurl = getConfig('MODEL_TOPURL', base_model_url) +initial_pipeline_urls = getConfig('INITIAL_PIPELINE_URL', "") +if initial_pipeline_urls == "": + if model_topurl == base_model_url: + initial_pipeline_urls = [get_pipeline_url(model_topurl=model_topurl, pipeline_name=pipeline_name) for pipeline_name in default_pipelines.values()] + else: + initial_pipeline_urls = [get_pipeline_url(model_topurl=model_topurl, pipeline_name=default_train_output_pipeline)] +else: + initial_pipeline_urls = initial_pipeline_urls.split(",") model_toppath = getConfig('MODEL_PATH', getPath(MODEL_FOLDERNAME)) download_path = getConfig('MODEL_PATH', getPath(DOWNLOAD_FOLDERNAME)) @@ -120,13 +127,17 @@ def get_energy_source(prefix): # get_init_model_url: get initial model from URL if estimator is enabled def get_init_model_url(energy_source, output_type, model_topurl=model_topurl): + if base_model_url == model_topurl: + pipeline_name = default_pipelines[energy_source] + else: + pipeline_name = default_train_output_pipeline for prefix in modelConfigPrefix: if get_energy_source(prefix) == energy_source: modelURL = get_init_url(prefix) print("get init url", modelURL) if modelURL == "" and is_support_output_type(output_type): print("init URL is not set, try using default URL".format(output_type)) - return get_url(output_type=ModelOutputType[output_type], energy_source=energy_source, model_topurl=model_topurl) + return get_url(feature_group=FeatureGroup.BPFOnly, output_type=ModelOutputType[output_type], energy_source=energy_source, model_topurl=model_topurl, pipeline_name=pipeline_name) else: return modelURL print("no match config for {}, {}".format(output_type, energy_source)) diff --git a/src/util/loader.py b/src/util/loader.py index bfb367f2..273ee515 100644 --- a/src/util/loader.py +++ b/src/util/loader.py @@ -18,12 +18,30 @@ CHECKPOINT_FOLDERNAME = 'checkpoint' PREPROCESS_FOLDERNAME = "preprocessed_data" -# TODO: change to v0.7 when the model is updated to database, need document update -# default_init_model_url = "https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.7/nx12" -DEFAULT_PIPELINE = "std_v{}".format(version) -default_init_model_url = "https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v0.6/nx12" +########################################################################## +# pipeline loader + +## default_train_output_pipeline: a default pipeline name which is output from the training pipeline +default_train_output_pipeline = "std_v{}".format(version) +default_pipelines = { + "intel_rapl": "ec2", + "acpi": "specpower" +} +base_model_url = "https://raw.githubusercontent.com/sustainable-computing-io/kepler-model-db/main/models/v{}".format(version) +def get_pipeline_url(model_topurl, pipeline_name): + file_ext = ".zip" + return os.path.join(model_topurl, pipeline_name + file_ext) + +def assure_pipeline_name(pipeline_name, energy_source, nodeCollection): + if pipeline_name == "": + pipeline_name = default_pipelines[energy_source] + if pipeline_name not in nodeCollection and default_train_output_pipeline in nodeCollection: + pipeline_name = default_train_output_pipeline + return pipeline_name +########################################################################## + default_trainer_name = "GradientBoostingRegressorTrainer" -default_node_type = 1 +default_node_type = 0 any_node_type = -1 default_feature_group = FeatureGroup.BPFOnly @@ -36,7 +54,6 @@ def load_json(path, name): res = json.load(f) return res except Exception as err: - print(err) return None def load_pkl(path, name): @@ -46,7 +63,8 @@ def load_pkl(path, name): try: res = joblib.load(filepath) return res - except Exception: + except Exception as err: + print("fail to load pkl {}: {}".format(filepath, err)) return None def load_remote_pkl(url_path): @@ -134,15 +152,46 @@ def is_valid_model(metadata, filters): def get_model_name(trainer_name, node_type): return "{}_{}".format(trainer_name, node_type) -def is_matched_type(model_name, node_type): - if node_type == any_node_type: - return True - return model_name.split("_")[-1] == str(node_type) +def get_node_type_from_name(model_name): + return int(model_name.split("_")[-1]) -def get_pipeline_path(model_toppath, pipeline_name=DEFAULT_PIPELINE): +def is_matched_type(nodeCollection, spec, pipeline_name, model_name, node_type, energy_source): + model_node_type = get_node_type_from_name(model_name) + if model_node_type == node_type: + return True + if node_type == any_node_type: + # if not specify spec, return true + if spec is None: + return True + # if covered by the model's node_type, return true + pipeline_name = assure_pipeline_name(pipeline_name, energy_source, nodeCollection) + if pipeline_name not in nodeCollection or nodeCollection[pipeline_name].node_type_index[model_node_type].cover(spec): + return True + return False + +def get_largest_candidates(model_names, pipeline_name, nodeCollection, energy_source): + pipeline_name = assure_pipeline_name(pipeline_name, energy_source, nodeCollection) + if pipeline_name not in nodeCollection: + return None + node_type_index = nodeCollection[pipeline_name].node_type_index + max_cores = 0 + candidates = dict() + for model_name in model_names: + model_node_type = get_node_type_from_name(model_name) + if model_node_type not in node_type_index: + continue + cores = node_type_index[model_node_type].get_cores() + if cores not in candidates: + candidates[cores] = [] + candidates[cores] += [model_name] + if cores > max_cores: + max_cores = cores + return candidates[max_cores] if max_cores > 0 else None + +def get_pipeline_path(model_toppath, pipeline_name): return os.path.join(model_toppath, pipeline_name) -def get_model_group_path(model_toppath, output_type, feature_group, energy_source, pipeline_name=DEFAULT_PIPELINE, assure=True): +def get_model_group_path(model_toppath, output_type, feature_group, energy_source, pipeline_name, assure=True): pipeline_path = get_pipeline_path(model_toppath, pipeline_name) energy_source_path = os.path.join(pipeline_path, energy_source) output_path = os.path.join(energy_source_path, output_type.name) @@ -186,7 +235,7 @@ def list_pipelines(model_toppath, energy_source, model_type): pipeline_names = [f for f in os.listdir(model_toppath) if os.path.exists(os.path.join(model_toppath, f, pipeline_metadata_filename + ".csv"))] return pipeline_names -def list_all_abs_models(model_toppath, energy_source, valid_fgs, pipeline_name=DEFAULT_PIPELINE): +def list_all_abs_models(model_toppath, energy_source, valid_fgs, pipeline_name): abs_models_map = dict() for fg in valid_fgs: group_path = get_model_group_path(model_toppath, output_type=ModelOutputType.AbsPower, feature_group=fg, energy_source=energy_source, pipeline_name=pipeline_name, assure=False) @@ -194,7 +243,7 @@ def list_all_abs_models(model_toppath, energy_source, valid_fgs, pipeline_name=D abs_models_map[group_path] = model_names return abs_models_map -def list_all_dyn_models(model_toppath, energy_source, valid_fgs, pipeline_name=DEFAULT_PIPELINE): +def list_all_dyn_models(model_toppath, energy_source, valid_fgs, pipeline_name): dyn_models_map = dict() for fg in valid_fgs: group_path = get_model_group_path(model_toppath, output_type=ModelOutputType.DynPower, feature_group=fg, energy_source=energy_source, pipeline_name=pipeline_name, assure=False) @@ -250,7 +299,9 @@ def get_download_output_path(download_path, energy_source, output_type): energy_source_path = assure_path(os.path.join(download_path, energy_source)) return os.path.join(energy_source_path, output_type.name) -def get_url(output_type, feature_group=default_feature_group, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=default_init_model_url, energy_source="intel_rapl", pipeline_name=DEFAULT_PIPELINE, model_name=None, weight=False): +def get_url(output_type, feature_group, energy_source, trainer_name=default_trainer_name, node_type=default_node_type, model_topurl=base_model_url, pipeline_name=None, model_name=None, weight=False): + if pipeline_name is None: + pipeline_name = default_pipelines[energy_source] group_path = get_model_group_path(model_topurl, output_type=output_type, feature_group=feature_group, energy_source=energy_source, pipeline_name=pipeline_name, assure=False) if model_name is None: model_name = get_model_name(trainer_name, node_type) @@ -259,12 +310,6 @@ def get_url(output_type, feature_group=default_feature_group, trainer_name=defau file_ext = ".json" return os.path.join(group_path, model_name + file_ext) -def get_pipeline_url(model_topurl=default_init_model_url, pipeline_name=DEFAULT_PIPELINE, weight=False): - file_ext = ".zip" - if weight: - file_ext = ".json" - return os.path.join(model_topurl, pipeline_name + file_ext) - def class_to_json(class_obj): return json.loads(json.dumps(class_obj.__dict__)) diff --git a/src/util/saver.py b/src/util/saver.py index c86c4b9f..b34f9992 100644 --- a/src/util/saver.py +++ b/src/util/saver.py @@ -13,6 +13,9 @@ def _pipeline_model_metadata_filename(energy_source, model_type): return "{}_{}_model_metadata".format(energy_source, model_type) +def _power_curve_filename(energy_source, model_type): + return "{}_{}_power_curve".format(energy_source, model_type) + def assure_path(path): if path == '': return '' diff --git a/tests/README.md b/tests/README.md index 4c4d0810..8a39f750 100644 --- a/tests/README.md +++ b/tests/README.md @@ -158,14 +158,14 @@ Optional arguments: Reuse (test_model): ```python - from util import ModelOutputType, model_toppath, DEFAULT_PIPELINE + from util import ModelOutputType, model_toppath, default_train_output_pipeline from isolator_test import get_isolate_results from extractor_test import get_extract_results, get_expected_power_columns from estimator_model_test import test_model # model folder under model_toppath which is ../src/models by default - pipeline_name = DEFAULT_PIPELINE + pipeline_name = default_train_output_pipeline # get_expected_power_columns(energy_components=test_energy_components, num_of_unit=test_num_of_unit) power_columns = get_expected_power_columns() @@ -247,7 +247,7 @@ offline_trainer_client_process(dataset_name, train_prom_response, idle_prom_resp Optional arguments: - energy_source: target energy source (default: intel_rapl) - - isolators: dict map of isolator class name to argument dict map (default: {"MinIdleIsolator": {}, "NoneIsolator": {}, "ProfileBackgroundIsolator": {}, "TrainIsolator": {"abs_pipeline_name": DEFAULT_PIPELINE}}) + - isolators: dict map of isolator class name to argument dict map (default: {"MinIdleIsolator": {}, "NoneIsolator": {}, "ProfileBackgroundIsolator": {}, "TrainIsolator": {"abs_pipeline_name": default_train_output_pipeline}}) - target_path: path to save trained ouput (default: data/offline_trainer_output) # Integration Test diff --git a/tests/e2e_test.sh b/tests/e2e_test.sh index f6198280..2c9fd88a 100755 --- a/tests/e2e_test.sh +++ b/tests/e2e_test.sh @@ -58,6 +58,13 @@ wait_for_server() { kubectl rollout status deploy kepler-model-server -n kepler --timeout 5m wait_for_keyword server "initial pipeline is loaded" "server cannot load initial pipeline" wait_for_keyword server "Press CTRL+C to quit" "server has not started yet" + get_server_log + kubectl get svc kepler-model-server -n kepler + kubectl get endpoints kepler-model-server -n kepler + # restart kepler-exporter to apply server-api + kubectl delete po -n kepler -l app.kubernetes.io/component=exporter + sleep 5 + get_kepler_log } wait_for_db() { @@ -103,7 +110,6 @@ check_estimator_set_and_init() { restart_model_server() { kubectl delete po -l app.kubernetes.io/component=model-server -n kepler wait_for_server - get_server_log } test() { @@ -114,21 +120,25 @@ test() { for opt in ${DEPLOY_OPTIONS}; do export $opt=true; done; - # train and deploy local modelDB - kubectl apply -f ${top_dir}/manifests/test/file-server.yaml - sleep 10 - wait_for_db + # patch MODEL_TOPURL environment if DB is not available + if [ -z ${DB} ]; then + + # train and deploy local modelDB + kubectl apply -f ${top_dir}/manifests/test/file-server.yaml + sleep 10 + wait_for_db + + if [ ! -z ${ESTIMATOR} ]; then + kubectl patch configmap -n kepler kepler-cfm --type merge -p "$(cat ${top_dir}/manifests/test/patch-estimator-sidecar.yaml)" + kubectl patch ds kepler-exporter -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"estimator","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' + fi + if [ ! -z ${SERVER} ]; then + kubectl patch deploy kepler-model-server -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"server-api","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' + kubectl delete po -n kepler -l app.kubernetes.io/component=model-server + fi + kubectl delete po -n kepler -l app.kubernetes.io/component=exporter - # patch MODEL_TOPURL environment - if [ ! -z ${ESTIMATOR} ]; then - kubectl patch configmap -n kepler kepler-cfm --type merge -p "$(cat ${top_dir}/manifests/test/patch-estimator-sidecar.yaml)" - kubectl patch ds kepler-exporter -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"estimator","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' - fi - if [ ! -z ${SERVER} ]; then - kubectl patch deploy kepler-model-server -n kepler -p '{"spec":{"template":{"spec":{"containers":[{"name":"server-api","env":[{"name":"MODEL_TOPURL","value":"http://model-db.kepler.svc.cluster.local:8110"}]}]}}}}' - kubectl delete po -n kepler -l app.kubernetes.io/component=model-server fi - kubectl delete po -n kepler -l app.kubernetes.io/component=exporter if [ ! -z ${ESTIMATOR} ]; then # with estimator @@ -148,9 +158,12 @@ test() { if [ ! -z ${SERVER} ]; then # with server wait_for_server + get_estimator_log + sleep 5 wait_for_keyword estimator "load model from model server" "estimator should be able to load model from server" else # no server + get_estimator_log wait_for_keyword estimator "load model from config" "estimator should be able to load model from config" fi else @@ -162,9 +175,10 @@ test() { kubectl patch ds kepler-exporter -n kepler --patch-file ${top_dir}/manifests/test/model-request-client.yaml restart_model_server sleep 1 - wait_for_kepler + wait_for_server wait_for_keyword kepler Done "cannot get model weight" else + wait_for_server wait_for_keyword kepler "getWeightFromServer.*core" "kepler should get weight from server" fi fi diff --git a/tests/estimator_model_request_test.py b/tests/estimator_model_request_test.py index 2c290c88..e3c2e017 100644 --- a/tests/estimator_model_request_test.py +++ b/tests/estimator_model_request_test.py @@ -21,6 +21,7 @@ os.environ['MODEL_SERVER_URL'] = 'http://localhost:8100' model_topurl = 'http://localhost:{}'.format(file_server_port) os.environ['MODEL_TOPURL'] = model_topurl +os.environ['INITIAL_PIPELINE_URL'] = os.path.join(model_topurl, "std_v0.7") server_path = os.path.join(os.path.dirname(__file__), '../src') util_path = os.path.join(os.path.dirname(__file__), '../src/util') @@ -36,7 +37,7 @@ from http_server import http_file_server from train_types import FeatureGroups, FeatureGroup, ModelOutputType -from loader import get_download_output_path +from loader import get_download_output_path, default_train_output_pipeline from estimate.estimator import handle_request, loaded_model, PowerRequest from estimate.model_server_connector import list_all_models from estimate.archived_model import get_achived_model, reset_failed_list @@ -54,6 +55,7 @@ # test getting model from server os.environ['MODEL_SERVER_ENABLE'] = "true" available_models = list_all_models() + assert len(available_models) > 0, "must have more than one available models" print("Available Models:", available_models) for output_type_name, valid_fgs in available_models.items(): output_type = ModelOutputType[output_type_name] @@ -64,13 +66,12 @@ if output_type.name in loaded_model and energy_source in loaded_model[output_type.name]: del loaded_model[output_type.name][energy_source] metrics = FeatureGroups[FeatureGroup[fg_name]] - request_json = generate_request(None, n=10, metrics=metrics, output_type=output_type_name) + request_json = generate_request(train_name=None, n=10, metrics=metrics, output_type=output_type_name) data = json.dumps(request_json) output = handle_request(data) print("result {}/{} from model server: {}".format(output_type_name, fg_name, output)) assert len(output['powers']) > 0, "cannot get power {}\n {}".format(output['msg'], request_json) - # test with initial models os.environ['MODEL_SERVER_ENABLE'] = "false" for output_type in ModelOutputType: @@ -106,7 +107,7 @@ if os.path.exists(output_path): shutil.rmtree(output_path) # valid model - os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl) + os.environ[init_url_key] = get_url(energy_source=energy_source, output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl, pipeline_name=default_train_output_pipeline) print("Requesting from ", os.environ[init_url_key]) request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) @@ -115,14 +116,14 @@ print("result {}/{} from static set: {}".format(output_type_name, FeatureGroup.CgroupOnly.name, output)) del loaded_model[output_type_name][energy_source] # invalid model - os.environ[init_url_key] = get_url(output_type=output_type, feature_group=FeatureGroup.BPFOnly, model_topurl=model_topurl) + os.environ[init_url_key] = get_url(energy_source=energy_source, output_type=output_type, feature_group=FeatureGroup.BPFOnly, model_topurl=model_topurl, pipeline_name=default_train_output_pipeline) print("Requesting from ", os.environ[init_url_key]) request_json = generate_request(None, n=10, metrics=FeatureGroups[FeatureGroup.CgroupOnly], output_type=output_type_name) data = json.dumps(request_json) power_request = json.loads(data, object_hook = lambda d : PowerRequest(**d)) output_path = get_achived_model(power_request) assert output_path is None, "model should be invalid\n {}".format(output_path) - os.environ['MODEL_CONFIG'] = "{}=true\n{}={}\n".format(estimator_enable_key,init_url_key,get_url(output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl)) + os.environ['MODEL_CONFIG'] = "{}=true\n{}={}\n".format(estimator_enable_key,init_url_key,get_url(energy_source=energy_source, output_type=output_type, feature_group=FeatureGroup.CgroupOnly, model_topurl=model_topurl, pipeline_name=default_train_output_pipeline)) set_env_from_model_config() print("Requesting from ", os.environ[init_url_key]) reset_failed_list() diff --git a/tests/estimator_model_test.py b/tests/estimator_model_test.py index af894f2a..7704f53d 100644 --- a/tests/estimator_model_test.py +++ b/tests/estimator_model_test.py @@ -15,7 +15,7 @@ from estimate import load_model, default_predicted_col_func, compute_error from train.trainer import model_toppath -from util.loader import get_model_group_path, DEFAULT_PIPELINE +from util.loader import get_model_group_path, default_train_output_pipeline from util import FeatureGroup, ModelOutputType, list_model_names from util.prom_types import TIMESTAMP_COL @@ -51,7 +51,7 @@ def test_model(group_path, model_name, test_data_with_label, power_columns, powe # all energy_source, model_output, feature_group def test_all_models(test_data_with_label, power_columns, output_type, feature_group, energy_source=test_energy_source): result_df_list = [] - group_path = get_model_group_path(model_toppath, output_type, feature_group, energy_source, assure=False, pipeline_name=DEFAULT_PIPELINE) + group_path = get_model_group_path(model_toppath, output_type, feature_group, energy_source, assure=False, pipeline_name=default_train_output_pipeline) model_names = list_model_names(group_path) for model_name in model_names: result_df, _, _ = test_model(group_path, model_name, test_data_with_label, power_columns) diff --git a/tests/estimator_power_request_test.py b/tests/estimator_power_request_test.py index 5c982131..09676e86 100644 --- a/tests/estimator_power_request_test.py +++ b/tests/estimator_power_request_test.py @@ -7,7 +7,7 @@ util_path = os.path.join(os.path.dirname(__file__), '..', 'src', 'util') sys.path.append(util_path) -from train_types import WORKLOAD_FEATURES, SYSTEM_FEATURES, ModelOutputType, CATEGORICAL_LABEL_TO_VOCAB +from train_types import WORKLOAD_FEATURES, SYSTEM_FEATURES, ModelOutputType, CATEGORICAL_LABEL_TO_VOCAB, PowerSourceMap from config import SERVE_SOCKET from extractor_test import test_energy_source @@ -27,6 +27,14 @@ def generate_request(train_name, n=1, metrics=WORKLOAD_FEATURES, system_features request_json['source'] = energy_source return request_json +def process(energy_source): + request_json = generate_request(trainer_names[0], 2, output_type="AbsPower", energy_source=energy_source) + res = client.make_request(request_json) + res_json = json.loads(res) + print(res_json) + assert res_json["msg"]=="", "response error: {}".format(res_json["msg"]) + assert len(res_json["powers"]) > 0, "zero powers" + class Client: def __init__(self, socket_path): self.socket_path = socket_path @@ -49,9 +57,5 @@ def make_request(self, request_json): if __name__ == '__main__': client = Client(SERVE_SOCKET) - request_json = generate_request(trainer_names[0], 2, output_type="AbsPower") - res = client.make_request(request_json) - res_json = json.loads(res) - print(res_json) - assert res_json["msg"]=="", "response error: {}".format(res_json["msg"]) - assert len(res_json["powers"]) > 0, "zero powers" \ No newline at end of file + for energy_source in PowerSourceMap.keys(): + process(energy_source) diff --git a/tests/model_server_test.py b/tests/model_server_test.py index 938a669e..1fbc50e7 100644 --- a/tests/model_server_test.py +++ b/tests/model_server_test.py @@ -55,8 +55,12 @@ def get_models(): assert len(models) > 0, "more than one type of output" for output_models in models.values(): assert len(output_models) > 0, "more than one best model for each output" + + test_feature_groups = [FeatureGroup.BPFOnly, FeatureGroup.CounterOnly] + # for each features - for feature_group, metrics in FeatureGroups.items(): + for fg in test_feature_groups: + metrics = FeatureGroups[fg] # abs power output_type = ModelOutputType.AbsPower make_request(metrics, output_type) @@ -65,7 +69,8 @@ def get_models(): output_type = ModelOutputType.DynPower make_request(metrics, output_type) make_request(metrics, output_type, weight=True) - metrics = FeatureGroups[FeatureGroup.Full] + + metrics = FeatureGroups[FeatureGroup.BPFOnly] # with node_type make_request(metrics, output_type, node_type=1) make_request(metrics, output_type, node_type=1, weight=True) diff --git a/tests/offline_trainer_test.py b/tests/offline_trainer_test.py index 7064068d..990eb98a 100644 --- a/tests/offline_trainer_test.py +++ b/tests/offline_trainer_test.py @@ -45,7 +45,7 @@ "MinIdleIsolator": {}, "NoneIsolator": {}, "ProfileBackgroundIsolator": {} - # "TrainIsolator": {"abs_pipeline_name": DEFAULT_PIPELINE} # TODO: too heavy to test on CI + # "TrainIsolator": {"abs_pipeline_name": default_train_output_pipeline} # TODO: too heavy to test on CI } def get_target_path(save_path, energy_source, feature_group): diff --git a/tests/pipeline_test.py b/tests/pipeline_test.py index 81eaded3..1fbec67f 100644 --- a/tests/pipeline_test.py +++ b/tests/pipeline_test.py @@ -7,17 +7,27 @@ sys.path.append(src_path) ################################################################# -from train import NewPipeline +from train import NewPipeline, NodeTypeSpec from util import get_valid_feature_group_from_queries, PowerSourceMap -from util.loader import DEFAULT_PIPELINE +from util.loader import default_train_output_pipeline, default_node_type from prom_test import get_query_results, prom_output_path, prom_output_filename from extractor_test import test_extractors, test_energy_source from isolator_test import test_isolators from trainer_test import test_trainer_names, assert_train +# fake spec value +spec_values = { + "processor": "test", + "cores": 1, + "chips": 1, + "memory_gb": -1, + "cpu_freq_mhz": -1 + } +spec = NodeTypeSpec(**spec_values) + def assert_pipeline(pipeline, query_results, feature_group, energy_source, energy_components): - success, abs_data, dyn_data = pipeline.process(query_results, energy_components, energy_source, feature_group=feature_group.name) + success, abs_data, dyn_data = pipeline.process(query_results, energy_components, energy_source, feature_group=feature_group.name, replace_node_type=default_node_type) assert success, "failed to process pipeline {}".format(pipeline.name) for trainer in pipeline.trainers: if trainer.feature_group == feature_group and trainer.energy_source == energy_source: @@ -26,19 +36,23 @@ def assert_pipeline(pipeline, query_results, feature_group, energy_source, energ else: assert_train(trainer, dyn_data, energy_components) -def process(save_pipeline_name=DEFAULT_PIPELINE, prom_save_path=prom_output_path, prom_save_name=prom_output_filename, abs_trainer_names=test_trainer_names, dyn_trainer_names=test_trainer_names, extractors=test_extractors, isolators=test_isolators, target_energy_sources=[test_energy_source], valid_feature_groups=None): +def process(save_pipeline_name=default_train_output_pipeline, prom_save_path=prom_output_path, prom_save_name=prom_output_filename, abs_trainer_names=test_trainer_names, dyn_trainer_names=test_trainer_names, extractors=test_extractors, isolators=test_isolators, target_energy_sources=[test_energy_source], valid_feature_groups=None): query_results = get_query_results(save_path=prom_save_path, save_name=prom_save_name) if valid_feature_groups is None: valid_feature_groups = get_valid_feature_group_from_queries(query_results.keys()) for extractor in extractors: for isolator in isolators: pipeline = NewPipeline(save_pipeline_name, abs_trainer_names, dyn_trainer_names, extractor=extractor, isolator=isolator, target_energy_sources=target_energy_sources ,valid_feature_groups=valid_feature_groups) + global spec + pipeline.node_collection.index_train_machine("test", spec) for energy_source in target_energy_sources: energy_components = PowerSourceMap[energy_source] for feature_group in valid_feature_groups: assert_pipeline(pipeline, query_results, feature_group, energy_source, energy_components) # save metadata pipeline.save_metadata() + # save node collection + pipeline.node_collection.save() # save pipeline pipeline.archive_pipeline() diff --git a/tests/trainer_test.py b/tests/trainer_test.py index 6e4bdaa4..72fe1246 100644 --- a/tests/trainer_test.py +++ b/tests/trainer_test.py @@ -13,7 +13,7 @@ from train import load_class from util import PowerSourceMap -from util.loader import DEFAULT_PIPELINE +from util.loader import default_train_output_pipeline from util.train_types import default_trainer_names from isolator_test import test_isolators, get_isolate_results @@ -39,7 +39,7 @@ def assert_train(trainer, data, energy_components): except sklearn.exceptions.NotFittedError: pass -def process(node_level, feature_group, result, trainer_names=test_trainer_names, energy_source=test_energy_source, power_columns=get_expected_power_columns(), pipeline_name=DEFAULT_PIPELINE): +def process(node_level, feature_group, result, trainer_names=test_trainer_names, energy_source=test_energy_source, power_columns=get_expected_power_columns(), pipeline_name=default_train_output_pipeline): energy_components = PowerSourceMap[energy_source] train_items = [] for trainer_name in trainer_names: @@ -50,7 +50,7 @@ def process(node_level, feature_group, result, trainer_names=test_trainer_names, train_items += [trainer.get_metadata()] return pd.concat(train_items) -def process_all(extractors=test_extractors, isolators=test_isolators, trainer_names=test_trainer_names, energy_source=test_energy_source, power_columns=get_expected_power_columns(), pipeline_name=DEFAULT_PIPELINE): +def process_all(extractors=test_extractors, isolators=test_isolators, trainer_names=test_trainer_names, energy_source=test_energy_source, power_columns=get_expected_power_columns(), pipeline_name=default_train_output_pipeline): abs_train_list = [] dyn_train_list = [] for extractor in extractors: