Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes and update Legion version #1259

Merged
merged 17 commits into from
Jan 26, 2024
Merged
12 changes: 6 additions & 6 deletions .github/workflows/gpu-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ jobs:
CONDA: "3"
needs: inference-tests
container:
image: ghcr.io/flexflow/flexflow-environment-cuda:latest
image: ghcr.io/flexflow/flexflow-environment-cuda-11.8:latest
options: --gpus all --shm-size=8192m
steps:
- name: Install updated git version
Expand All @@ -243,7 +243,7 @@ jobs:

- name: Build and Install FlexFlow
run: |
export PATH=/opt/conda/bin:$PATH
export PATH=$CONDA_PREFIX/bin:$PATH
export FF_HOME=$(pwd)
export FF_BUILD_ALL_EXAMPLES=ON
export FF_BUILD_ALL_INFERENCE_EXAMPLES=ON
Expand All @@ -252,18 +252,18 @@ jobs:

- name: Check FlexFlow Python interface (pip)
run: |
export PATH=/opt/conda/bin:$PATH
export PATH=$CONDA_PREFIX/bin:$PATH
export FF_HOME=$(pwd)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/conda/lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib
./tests/python_interface_test.sh after-installation

- name: Run multi-gpu tests
run: |
export PATH=/opt/conda/bin:$PATH
export PATH=$CONDA_PREFIX/bin:$PATH
export CUDNN_DIR=/usr/local/cuda
export CUDA_DIR=/usr/local/cuda
export FF_HOME=$(pwd)
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/conda/lib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CONDA_PREFIX/lib
# C++ tests
./tests/cpp_gpu_tests.sh 4
# Python tests
Expand Down
8 changes: 5 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ if(NOT BUILD_LEGION_ONLY)

# python related
if (FF_USE_PYTHON)
find_package(Python COMPONENTS Interpreter Development)
# create flexflow_cffi_header.py
add_custom_command(TARGET flexflow
PRE_BUILD
Expand All @@ -424,13 +425,13 @@ if(NOT BUILD_LEGION_ONLY)
# generate the Legion Python bindings library. When building from pip, we need to do this post-install to prevent Legion from overwriting the path to the Legion shared library
add_custom_command(TARGET flexflow
POST_BUILD
COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python/setup.py build --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${Legion_BINARY_DIR} --build-lib=${Legion_BINARY_DIR}/bindings/python ${Legion_PYTHON_EXTRA_INSTALL_ARGS}
COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python/setup.py build --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${Legion_BINARY_DIR} --build-lib=${Legion_BINARY_DIR}/bindings/python ${Legion_PYTHON_EXTRA_INSTALL_ARGS}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/deps/legion/bindings/python
)
# create flexflow_python interpreter. When building from pip, we install the FF_HOME/python/flexflow_python script instead.
add_custom_command(TARGET flexflow
PRE_BUILD
COMMAND ${PYTHON_EXECUTABLE} ${FLEXFLOW_ROOT}/python/flexflow_python_build.py --build-dir ${CMAKE_BINARY_DIR}
COMMAND ${Python_EXECUTABLE} ${FLEXFLOW_ROOT}/python/flexflow_python_build.py --build-dir ${CMAKE_BINARY_DIR}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMENT "Creating flexflow_python interpreter..."
)
Expand Down Expand Up @@ -567,7 +568,8 @@ if(NOT BUILD_LEGION_ONLY)
install(TARGETS flexflow DESTINATION ${LIB_DEST})
# install python
if (FF_USE_PYTHON)
execute_process(COMMAND ${PYTHON_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
find_package(Python COMPONENTS Interpreter Development)
execute_process(COMMAND ${Python_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
if (NOT FF_BUILD_FROM_PYPI)
install(
DIRECTORY ${FLEXFLOW_ROOT}/python/flexflow/
Expand Down
4 changes: 2 additions & 2 deletions cmake/pip_install/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Use setup.py script to re-install the Python bindings library with the right library paths
if (FF_USE_PYTHON)
execute_process(COMMAND ${PYTHON_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
execute_process(COMMAND ${Python_EXECUTABLE} -c "import site, os; print([pkg for func in (site.getsitepackages(), site.getusersitepackages()) for pkg in ([func] if isinstance(func, str) else func) if os.access(pkg, os.W_OK)][0])" OUTPUT_VARIABLE PY_DEST OUTPUT_STRIP_TRAILING_WHITESPACE)
if(FF_BUILD_FROM_PYPI)
install(CODE "execute_process(COMMAND ${CMAKE_COMMAND} -E echo \"Editing path to Legion library using path: ${PY_DEST}/flexflow/lib \")")
# CMAKE_CURRENT_SOURCE_DIR=/usr/FlexFlow/cmake/pip_install
# Legion_BINARY_DIR=/usr/FlexFlow/build/<something>/deps/legion
install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python/setup.py install --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${PY_DEST}/flexflow ${Legion_PYTHON_EXTRA_INSTALL_ARGS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python)")
install(CODE "execute_process(COMMAND ${Python_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python/setup.py install --cmake-build-dir ${Legion_BINARY_DIR}/runtime --prefix ${PY_DEST}/flexflow ${Legion_PYTHON_EXTRA_INSTALL_ARGS} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../../deps/legion/bindings/python)")
endif()
endif()
2 changes: 1 addition & 1 deletion deps/legion
Submodule legion updated from 626b55 to 24e8c4
9 changes: 4 additions & 5 deletions include/flexflow/mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ class FFMapper : public NullMapper {
Task const &task,
MapTaskInput const &input,
MapTaskOutput &output);
virtual void map_replicate_task(const MapperContext ctx,
Task const &task,
MapTaskInput const &input,
MapTaskOutput const &default_output,
MapReplicateTaskOutput &output);
virtual void replicate_task(const MapperContext ctx,
Task const &task,
ReplicateTaskInput const &input,
ReplicateTaskOutput &output);
virtual void select_task_variant(const MapperContext ctx,
Task const &task,
SelectVariantInput const &input,
Expand Down
2 changes: 2 additions & 0 deletions include/flexflow/model.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ enum TaskIDs {
// NCCL tasks
NCCL_GETUNIQUEID_TASK_ID,
NCCL_INIT_COMMS_TASK_ID,
NCCL_FINISH_COMMS_TASK_ID,
// Search
STRATEGY_SEARCH_TASK_ID,
// Graph
Expand Down Expand Up @@ -397,6 +398,7 @@ std::vector<ParallelTensorShape>
class FFModel {
public:
FFModel(FFConfig &config, bool cpu_offload = false);
~FFModel();

static constexpr float PROPAGATION_CHANCE = 0.25;
static constexpr float CONTINUE_PROPAGATION_CHANCE = 0.75;
Expand Down
5 changes: 5 additions & 0 deletions include/flexflow/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ class Op {
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);
static void
finish_nccl_comms_task(Legion::Task const *task,
std::vector<Legion::PhysicalRegion> const &regions,
Legion::Context ctx,
Legion::Runtime *runtime);
#endif
protected:
void set_argumentmap_for_init(FFModel const &ff, Legion::ArgumentMap &argmap);
Expand Down
1 change: 0 additions & 1 deletion include/flexflow/request_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class InferenceManager {
public:
std::unordered_map<ParallelTensor, std::vector<ParallelTensor>> tensor_buffer;
std::unordered_map<FFModel *, FileDataLoader *> model_weights_loaders;
int num_devices;
};

struct Request {
Expand Down
47 changes: 20 additions & 27 deletions src/mapper/mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -661,44 +661,37 @@ void FFMapper::map_task(const MapperContext ctx,
} // for idx
}

void FFMapper::map_replicate_task(const MapperContext ctx,
Task const &task,
MapTaskInput const &input,
MapTaskOutput const &default_output,
MapReplicateTaskOutput &output) {
void FFMapper::replicate_task(const MapperContext ctx,
Task const &task,
ReplicateTaskInput const &input,
ReplicateTaskOutput &output) {
// Should only be replicated for the top-level task
assert((task.get_depth() == 0) && (task.regions.size() == 0));
const Processor::Kind target_kind = task.target_proc.kind();
VariantID chosen_variant;
VariantID vid;
{
std::vector<VariantID> variant_ids;
runtime->find_valid_variants(
ctx, task.task_id, variant_ids, task.target_proc.kind());
runtime->find_valid_variants(ctx, task.task_id, variant_ids, target_kind);
// Currently assume there is exactly one variant
assert(variant_ids.size() == 1);
chosen_variant = variant_ids[0];
output.chosen_variant = variant_ids[0];
}
std::vector<Processor> const &all_procs = all_procs_by_kind(target_kind);
// Place on replicate on each node by default
output.task_mappings.resize(total_nodes, default_output);
// Assume default_output does not include any target_procs
assert(default_output.target_procs.size() == 0);
for (std::vector<Processor>::const_iterator it = all_procs.begin();
it != all_procs.end();
output.target_processors.resize(total_nodes);
std::vector<bool> handled(total_nodes, false);
size_t count = 0;
Machine::ProcessorQuery procs(machine);
procs.only_kind(target_kind);
for (Machine::ProcessorQuery::iterator it = procs.begin(); it != procs.end();
it++) {
AddressSpace space = it->address_space();
assert(space < output.task_mappings.size());
// Add *it as a target_proc if we haven't found one
if (output.task_mappings[space].target_procs.size() == 0) {
output.task_mappings[space].target_procs.push_back(*it);
const AddressSpace space = it->address_space();
if (handled[space]) {
continue;
}
output.target_processors[space] = *it;
handled[space] = true;
count++;
}
output.control_replication_map.resize(total_nodes);
for (int idx = 0; idx < total_nodes; idx++) {
output.task_mappings[idx].chosen_variant = chosen_variant;
output.control_replication_map[idx] =
output.task_mappings[idx].target_procs[0];
}
assert(count == total_nodes);
}

void FFMapper::select_task_variant(const MapperContext ctx,
Expand Down
8 changes: 2 additions & 6 deletions src/ops/linear.cc
Original file line number Diff line number Diff line change
Expand Up @@ -467,12 +467,8 @@ OpMeta *Linear::init_task_with_dim(Task const *task,
ctx,
runtime,
false /*readOutput*/);
// TensorAccessorW<WT, NDIM> acc_kernel(regions[2],
// task->regions[2],
// FID_DATA,
// ctx,
// runtime,
// false /*readOutput*/);
TensorAccessorR<WT, NDIM> acc_kernel(
regions[2], task->regions[2], FID_DATA, ctx, runtime);

// TensorAccessorR<float, 1> acc_bias(
// regions[3], task->regions[3], FID_DATA, ctx, runtime);
Expand Down
30 changes: 1 addition & 29 deletions src/runtime/inference_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,7 @@ using namespace Legion;
LegionRuntime::Logger::Category log_inf_mgr("InferenceManager");
LegionRuntime::Logger::Category log_offload("Offloading");

InferenceManager::InferenceManager() {
#ifdef DEADCODE
num_devices = ff_config.workersPerNode * ff_config.numNodes;
// Check parallelization degrees
assert(ff_config.data_parallelism_degree <= num_devices &&
"Data parallelism degree exceeds number of available devices");
assert(num_devices % ff_config.data_parallelism_degree == 0 &&
"Number of available devices is not divisible by data parallelism "
"degree");
assert(ff_config.tensor_parallelism_degree <= num_devices &&
"Tensor parallelism degree exceeds number of available devices");
assert(num_devices % ff_config.tensor_parallelism_degree == 0 &&
"Number of available devices is not divisible by tensor parallelism "
"degree");
assert(ff_config.pipeline_parallelism_degree <= num_devices &&
"Pipeline parallelism degree exceeds number of available devices");
assert(num_devices % ff_config.pipeline_parallelism_degree == 0 &&
"Number of available devices is not divisible by pipeline parallelism "
"degree");
assert(ff_config.data_parallelism_degree *
ff_config.tensor_parallelism_degree *
ff_config.pipeline_parallelism_degree ==
num_devices &&
"Product of data, tensor, and pipeline parallelism degrees does not "
"match the number of available devices");
#endif
}
InferenceManager::InferenceManager() {}

InferenceManager *inference_manager_singleton = nullptr;

Expand Down Expand Up @@ -296,8 +270,6 @@ void InferenceManager::compile_model_and_allocate_buffer(FFModel *model) {
void InferenceManager::init_operators_inference(FFModel *model) {
for (int batch_index = 0; batch_index < model->config.data_parallelism_degree;
batch_index++) {
int expert_device_index = 0;
int device_index = batch_index % num_devices;
for (size_t o = 0; o < model->operators.size(); o++) {
Op *op = model->operators[o];
if (op->op_type == OP_WEIGHT) {
Expand Down
61 changes: 61 additions & 0 deletions src/runtime/model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,15 @@ ncclComm_t Op::init_nccl_comms_task(Task const *task,
// ncclComm, allRanks, myRank, ncclId);
return ncclComm;
}

void Op::finish_nccl_comms_task(Task const *task,
std::vector<PhysicalRegion> const &regions,
Context ctx,
Runtime *runtime) {
ncclComm_t comm = *((ncclComm_t *)task->local_args);
checkNCCL(ncclCommFinalize(comm));
checkNCCL(ncclCommDestroy(comm));
}
#endif

/**
Expand Down Expand Up @@ -1578,6 +1587,43 @@ FFModel::FFModel(FFConfig &_config, bool cpu_offload)
model_id = model_counter++;
}

FFModel::~FFModel() {
// Destroy nccl communication groups
#ifdef FF_USE_NCCL
Context ctx = config.lg_ctx;
Runtime *runtime = config.lg_hlr;
for (auto const &comm : view_hash_to_nccl_comms) {
// Find the machine view that has the hash
MachineView view;
for (size_t l = 0; l < operators.size(); l++) {
view = operators[l]->outputs[0]->machine_view;
if (view.hash() == comm.first) {
break;
}
}
assert(view.hash() == comm.first && "Cannot find the machine view");
IndexSpace task_is = get_or_create_task_is(view);
Domain domain = runtime->get_index_space_domain(ctx, task_is);
ArgumentMap argmap;
int idx = 0;
for (Domain::DomainPointIterator it(domain); it; it++, idx++) {
argmap.set_point(*it,
TaskArgument(&comm.second[idx], sizeof(ncclComm_t)));
}
IndexLauncher index_launcher(NCCL_FINISH_COMMS_TASK_ID,
task_is,
TaskArgument(nullptr, 0),
argmap,
Predicate::TRUE_PRED,
false /*must*/,
0 /*mapper_id*/,
comm.first);
FutureMap fm = runtime->execute_index_space(ctx, index_launcher);
fm.wait_all_results();
}
#endif
}

void FFModel::clear_graph_search_cache() {
this->graph_search->clear_cache();
this->search->clear_cache();
Expand Down Expand Up @@ -6853,6 +6899,21 @@ void register_flexflow_internal_tasks(Runtime *runtime,
registrar);
}
}
{
TaskVariantRegistrar registrar(NCCL_FINISH_COMMS_TASK_ID,
"NCCL Finish Communicators");
registrar.add_constraint(ProcessorConstraint(Processor::TOC_PROC));
registrar.set_leaf();
if (pre_register) {
Runtime::preregister_task_variant<Op::finish_nccl_comms_task>(
registrar, "NCCL Finish Communicators Task");
} else {
if (enable_control_replication) {
registrar.global_registration = false;
}
runtime->register_task_variant<Op::finish_nccl_comms_task>(registrar);
}
}
#endif
// Search
{
Expand Down
Loading