From 33596947ef894cd924fe72ed0abc98902e04587a Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Fri, 23 Aug 2024 11:39:16 +0800 Subject: [PATCH 01/32] lazy_inline to reduce compile time --- examples/opensora_hpcai/opensora/models/stdit/stdit3.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py index a1294168da..f0ad70bd7a 100644 --- a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py +++ b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py @@ -35,6 +35,9 @@ class STDiT3Block(nn.Cell): + + # to reduce compilation time + @lazy_inline(policy="front") def __init__( self, hidden_size, From db6a4bf98aff5234c3aef9bd4a96acc40eb09ce1 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Fri, 23 Aug 2024 12:08:57 +0800 Subject: [PATCH 02/32] lazy_inline to reduce compile time --- examples/opensora_hpcai/opensora/models/stdit/stdit3.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py index a1294168da..41016efaab 100644 --- a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py +++ b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py @@ -35,6 +35,9 @@ class STDiT3Block(nn.Cell): + + # to reduce compilation time + @ms.lazy_inline(policy="front") def __init__( self, hidden_size, From 6252f314726c846d8fbb1e8efe1c86c87ed9c946 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Fri, 23 Aug 2024 15:25:11 +0800 Subject: [PATCH 03/32] leave large mem for communication, max_device_memory 55GB --- examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh b/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh index 5c6c440bc8..2f9ade3ebc 100644 --- a/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh +++ b/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh @@ -18,7 +18,7 @@ python scripts/train.py \ --pretrained_model_path="models/OpenSora-STDiT-v3/opensora_stdit_v3.ckpt" \ --mode=0 \ --jit_level O1 \ ---max_device_memory 59GB \ +--max_device_memory 55GB \ --config configs/opensora-v1-2/train/train_stage2.yaml \ --csv_path datasets/mixkit-100videos/video_caption_train.csv \ --video_folder datasets/mixkit-100videos/mixkit \ From 21fff2c17814e4d4edd318b4b5cd2ee1393e32c8 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Wed, 28 Aug 2024 12:40:21 +0800 Subject: [PATCH 04/32] fix train_steps bug --- .../opensora_hpcai/opensora/models/stdit/stdit3.py | 2 +- examples/opensora_hpcai/scripts/train.py | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py index f0ad70bd7a..41016efaab 100644 --- a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py +++ b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py @@ -37,7 +37,7 @@ class STDiT3Block(nn.Cell): # to reduce compilation time - @lazy_inline(policy="front") + @ms.lazy_inline(policy="front") def __init__( self, hidden_size, diff --git a/examples/opensora_hpcai/scripts/train.py b/examples/opensora_hpcai/scripts/train.py index 0e42e70590..d86b9104a8 100644 --- a/examples/opensora_hpcai/scripts/train.py +++ b/examples/opensora_hpcai/scripts/train.py @@ -562,17 +562,19 @@ def main(args): ) # compute total steps and data epochs (in unit of data sink size) + if args.dataset_sink_mode and args.sink_size != -1: + steps_per_sink = args.sink_size + else: + steps_per_sink = dataset_size + if args.train_steps == -1: assert args.epochs != -1 total_train_steps = args.epochs * dataset_size + sink_epochs = math.ceil(total_train_steps / steps_per_sink) else: total_train_steps = args.train_steps - - if args.dataset_sink_mode and args.sink_size != -1: - steps_per_sink = args.sink_size - else: - steps_per_sink = dataset_size - sink_epochs = math.ceil(total_train_steps / steps_per_sink) + # asume one step need one whole epoch data to ensure enough batch loading for training + sink_epochs = total_train_steps if args.ckpt_save_steps == -1: ckpt_save_interval = args.ckpt_save_interval From 5b5fe8219e855022a93a176e29b43eeee09b58b6 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Wed, 28 Aug 2024 15:58:03 +0800 Subject: [PATCH 05/32] fix linting --- examples/opensora_hpcai/opensora/models/stdit/stdit3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py index 41016efaab..b764d49f55 100644 --- a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py +++ b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py @@ -35,7 +35,6 @@ class STDiT3Block(nn.Cell): - # to reduce compilation time @ms.lazy_inline(policy="front") def __init__( From 22aed35bfde95dd3c29bae63212314ae450e1865 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Wed, 28 Aug 2024 17:06:27 +0800 Subject: [PATCH 06/32] add lazy_inline to vae encoder/decoder --- examples/opensora_hpcai/opensora/models/vae/modules.py | 3 +++ examples/opensora_hpcai/opensora/models/vae/vae_temporal.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/examples/opensora_hpcai/opensora/models/vae/modules.py b/examples/opensora_hpcai/opensora/models/vae/modules.py index 539afc26c9..489b22950c 100644 --- a/examples/opensora_hpcai/opensora/models/vae/modules.py +++ b/examples/opensora_hpcai/opensora/models/vae/modules.py @@ -2,6 +2,7 @@ import numpy as np +import mindspore as ms from mindspore import nn, ops _logger = logging.getLogger(__name__) @@ -177,6 +178,7 @@ def make_attn(in_channels, attn_type="vanilla"): # used in vae class Encoder(nn.Cell): + @ms.lazy_inline() def __init__( self, *, @@ -299,6 +301,7 @@ def construct(self, x): # used in vae class Decoder(nn.Cell): + @ms.lazy_inline() def __init__( self, *, diff --git a/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py b/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py index 733f1c20c8..8465c616cc 100644 --- a/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py +++ b/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py @@ -150,6 +150,7 @@ def get_activation_fn(activation): class Encoder(nn.Cell): """Encoder Blocks.""" + @ms.lazy_inline() def __init__( self, in_out_channels=4, @@ -260,6 +261,7 @@ def construct(self, x): class Decoder(nn.Cell): """Decoder Blocks.""" + @ms.lazy_inline() def __init__( self, in_out_channels=4, From 6c220c74fbcc3f7fca466af65adc2b07303e60c7 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Sat, 31 Aug 2024 18:34:23 +0800 Subject: [PATCH 07/32] rm lazy inline for vae due to perf drop --- examples/opensora_hpcai/opensora/models/vae/modules.py | 2 +- examples/opensora_hpcai/opensora/models/vae/vae_temporal.py | 2 +- examples/opensora_hpcai/opensora/schedulers/rectified_flow.py | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/opensora_hpcai/opensora/models/vae/modules.py b/examples/opensora_hpcai/opensora/models/vae/modules.py index 489b22950c..bb8c0a7d2b 100644 --- a/examples/opensora_hpcai/opensora/models/vae/modules.py +++ b/examples/opensora_hpcai/opensora/models/vae/modules.py @@ -178,7 +178,7 @@ def make_attn(in_channels, attn_type="vanilla"): # used in vae class Encoder(nn.Cell): - @ms.lazy_inline() + # @ms.lazy_inline() def __init__( self, *, diff --git a/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py b/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py index 8465c616cc..2089a8a9d5 100644 --- a/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py +++ b/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py @@ -150,7 +150,7 @@ def get_activation_fn(activation): class Encoder(nn.Cell): """Encoder Blocks.""" - @ms.lazy_inline() + # @ms.lazy_inline() def __init__( self, in_out_channels=4, diff --git a/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py b/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py index 445e105ad4..6393728e16 100644 --- a/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py +++ b/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py @@ -6,7 +6,9 @@ from typing_extensions import Literal # FIXME: python 3.7 from tqdm import tqdm +import numpy as np +import mindspore as ms from mindspore import Tensor, dtype, ops from ..utils.distributions import LogisticNormal @@ -72,6 +74,7 @@ def __call__( mask_t = frames_mask * self.num_timesteps x0 = z.copy() x_noise = self.scheduler.add_noise(x0, ops.randn_like(x0), t) + # x_noise = self.scheduler.add_noise(x0, ms.Tensor(np.random.randn(*x0.shape), dtype=ms.float32), t) model_kwargs["frames_mask"] = mask_t_upper = mask_t >= t.unsqueeze(1) mask_add_noise = (mask_t_upper * (1 - noise_added)).astype(dtype.bool_) From 60e7705b08555b4996790ac284719c65305fcb8c Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Mon, 2 Sep 2024 18:18:01 +0800 Subject: [PATCH 08/32] rm lazy_inline in vae --- examples/opensora_hpcai/opensora/models/vae/modules.py | 5 ++--- examples/opensora_hpcai/opensora/models/vae/vae_temporal.py | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/opensora_hpcai/opensora/models/vae/modules.py b/examples/opensora_hpcai/opensora/models/vae/modules.py index 489b22950c..0006ffca22 100644 --- a/examples/opensora_hpcai/opensora/models/vae/modules.py +++ b/examples/opensora_hpcai/opensora/models/vae/modules.py @@ -178,7 +178,7 @@ def make_attn(in_channels, attn_type="vanilla"): # used in vae class Encoder(nn.Cell): - @ms.lazy_inline() + # @ms.lazy_inline() def __init__( self, *, @@ -299,9 +299,8 @@ def construct(self, x): return h -# used in vae class Decoder(nn.Cell): - @ms.lazy_inline() + # @ms.lazy_inline() def __init__( self, *, diff --git a/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py b/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py index 8465c616cc..1151a36446 100644 --- a/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py +++ b/examples/opensora_hpcai/opensora/models/vae/vae_temporal.py @@ -150,7 +150,7 @@ def get_activation_fn(activation): class Encoder(nn.Cell): """Encoder Blocks.""" - @ms.lazy_inline() + # @ms.lazy_inline() def __init__( self, in_out_channels=4, @@ -261,7 +261,7 @@ def construct(self, x): class Decoder(nn.Cell): """Decoder Blocks.""" - @ms.lazy_inline() + # @ms.lazy_inline() def __init__( self, in_out_channels=4, From b3a4ff9b2115c7501c68cc1b1ed4b3ee73532d97 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Mon, 2 Sep 2024 19:22:15 +0800 Subject: [PATCH 09/32] only require decord when backend selected --- .../opensora/datasets/video_dataset_refactored.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py b/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py index a0be3759d2..e4a637999e 100644 --- a/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py +++ b/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py @@ -10,13 +10,11 @@ import cv2 import numpy as np -from decord import VideoReader from tqdm import tqdm import mindspore as ms from mindspore.dataset.transforms import Compose from mindspore.dataset.vision import CenterCrop, Inter, Normalize - from mindone.data.video_reader import VideoReader as VideoReader_CV2 from .bucket import Bucket @@ -252,6 +250,7 @@ def _get_item(self, idx: int) -> Tuple[Any, ...]: else: if self.video_backend == "decord": + from decord import VideoReader reader = VideoReader(video_path) min_length = self._min_length video_length = len(reader) From 8f918f64f2e991f38685e3e3e70dc08b7a9bf397 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Tue, 3 Sep 2024 11:49:38 +0800 Subject: [PATCH 10/32] fix logging --- mindone/trainers/callback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mindone/trainers/callback.py b/mindone/trainers/callback.py index f1d0d6f5ac..bdd2e7c8a8 100755 --- a/mindone/trainers/callback.py +++ b/mindone/trainers/callback.py @@ -9,7 +9,7 @@ from .checkpoint import CheckpointManager from .recorder import PerfRecorder -_logger = logging.getLogger(__name__) +_logger = logging.getLogger("") __all__ = ["OverflowMonitor", "EvalSaveCallback", "ProfilerCallback", "StopAtStepCallback"] From a52d80e932cb40a2b5a1401e420626122bc8478f Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Tue, 3 Sep 2024 11:58:26 +0800 Subject: [PATCH 11/32] fix logging --- .../opensora/datasets/video_dataset_refactored.py | 2 ++ examples/opensora_hpcai/opensora/models/vae/modules.py | 2 +- examples/opensora_hpcai/opensora/schedulers/rectified_flow.py | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py b/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py index e4a637999e..8c91f917c0 100644 --- a/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py +++ b/examples/opensora_hpcai/opensora/datasets/video_dataset_refactored.py @@ -15,6 +15,7 @@ import mindspore as ms from mindspore.dataset.transforms import Compose from mindspore.dataset.vision import CenterCrop, Inter, Normalize + from mindone.data.video_reader import VideoReader as VideoReader_CV2 from .bucket import Bucket @@ -251,6 +252,7 @@ def _get_item(self, idx: int) -> Tuple[Any, ...]: else: if self.video_backend == "decord": from decord import VideoReader + reader = VideoReader(video_path) min_length = self._min_length video_length = len(reader) diff --git a/examples/opensora_hpcai/opensora/models/vae/modules.py b/examples/opensora_hpcai/opensora/models/vae/modules.py index 0006ffca22..7ebf4d94f5 100644 --- a/examples/opensora_hpcai/opensora/models/vae/modules.py +++ b/examples/opensora_hpcai/opensora/models/vae/modules.py @@ -2,7 +2,7 @@ import numpy as np -import mindspore as ms +# import mindspore as ms from mindspore import nn, ops _logger = logging.getLogger(__name__) diff --git a/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py b/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py index 6393728e16..0464dd5387 100644 --- a/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py +++ b/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py @@ -5,10 +5,10 @@ except ImportError: from typing_extensions import Literal # FIXME: python 3.7 +# import numpy as np +# import mindspore as ms from tqdm import tqdm -import numpy as np -import mindspore as ms from mindspore import Tensor, dtype, ops from ..utils.distributions import LogisticNormal From a0d6632231007682fb7354476c778f3452f87bbe Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Tue, 3 Sep 2024 15:48:29 +0800 Subject: [PATCH 12/32] x1: rm duplicated norm --- .../opensora_hpcai/opensora/models/stdit/stdit3.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py index b764d49f55..6a4b20cabb 100644 --- a/examples/opensora_hpcai/opensora/models/stdit/stdit3.py +++ b/examples/opensora_hpcai/opensora/models/stdit/stdit3.py @@ -102,9 +102,10 @@ def construct( ) # modulate (attention) - x_m = t2i_modulate(self.norm1(x), shift_msa, scale_msa) + norm1 = self.norm1(x) + x_m = t2i_modulate(norm1, shift_msa, scale_msa) # frames mask branch - x_m_zero = t2i_modulate(self.norm1(x), shift_msa_zero, scale_msa_zero) + x_m_zero = t2i_modulate(norm1, shift_msa_zero, scale_msa_zero) x_m = t_mask_select(frames_mask, x_m, x_m_zero, T, S) # attention @@ -130,9 +131,10 @@ def construct( x = x + self.cross_attn(x, y, mask) # modulate (MLP) - x_m = t2i_modulate(self.norm2(x), shift_mlp, scale_mlp) + norm2 = self.norm2(x) + x_m = t2i_modulate(norm2, shift_mlp, scale_mlp) # frames mask branch - x_m_zero = t2i_modulate(self.norm2(x), shift_mlp_zero, scale_mlp_zero) + x_m_zero = t2i_modulate(norm2, shift_mlp_zero, scale_mlp_zero) x_m = t_mask_select(frames_mask, x_m, x_m_zero, T, S) # MLP From 1b5f3f1bd0cf6a563b8b7fbcd55833922b5b049c Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Tue, 3 Sep 2024 15:48:39 +0800 Subject: [PATCH 13/32] x-1: use ops.rms_norm, mint.layer_norm --- .../opensora/models/layers/blocks.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/examples/opensora_hpcai/opensora/models/layers/blocks.py b/examples/opensora_hpcai/opensora/models/layers/blocks.py index 86c1ded846..f7f994e09a 100644 --- a/examples/opensora_hpcai/opensora/models/layers/blocks.py +++ b/examples/opensora_hpcai/opensora/models/layers/blocks.py @@ -5,7 +5,7 @@ import numpy as np import mindspore as ms -from mindspore import Parameter, Tensor, nn, ops +from mindspore import Parameter, Tensor, mint, nn, ops from mindspore.common.initializer import initializer from mindone.models.modules.flash_attention import FLASH_IS_AVAILABLE, MSFlashAttention @@ -26,9 +26,10 @@ def __init__(self, hidden_size, eps=1e-6): self.variance_epsilon = eps def construct(self, hidden_states: Tensor): - variance = hidden_states.pow(2).mean(-1, keep_dims=True) - hidden_states = hidden_states * ops.rsqrt(variance + self.variance_epsilon) - return self.gamma * hidden_states + # variance = hidden_states.pow(2).mean(-1, keep_dims=True) + # hidden_states = hidden_states * ops.rsqrt(variance + self.variance_epsilon) + # return self.gamma * hidden_states + return ops.rms_norm(hidden_states, self.gamma, self.variance_epsilon)[0] class Attention(nn.Cell): @@ -325,10 +326,14 @@ def __init__(self, normalized_shape, eps=1e-5, elementwise_affine: bool = True, else: self.gamma = ops.ones(normalized_shape, dtype=dtype) self.beta = ops.zeros(normalized_shape, dtype=dtype) - self.layer_norm = ops.LayerNorm(-1, -1, epsilon=eps) + # self.layer_norm = ops.LayerNorm(-1, -1, epsilon=eps) def construct(self, x: Tensor): x, _, _ = self.layer_norm(x, self.gamma, self.beta) + + normalized_shape = x.shape[-1:] + x = mint.nn.functional.layer_norm(x, normalized_shape, self.gamma, self.beta, self.eps) + return x From 92051006a8d33ceee7af78d87de3a68d39247756 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Tue, 3 Sep 2024 15:49:42 +0800 Subject: [PATCH 14/32] x-2: rm hs list in vae encode --- .../opensora_hpcai/opensora/models/vae/modules.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/examples/opensora_hpcai/opensora/models/vae/modules.py b/examples/opensora_hpcai/opensora/models/vae/modules.py index 7ebf4d94f5..94fa68f7fc 100644 --- a/examples/opensora_hpcai/opensora/models/vae/modules.py +++ b/examples/opensora_hpcai/opensora/models/vae/modules.py @@ -276,6 +276,7 @@ def construct(self, x): temb = None # downsampling + ''' hs = [self.conv_in(x)] for i_level in range(self.num_resolutions): for i_block in range(self.num_res_blocks): @@ -288,6 +289,19 @@ def construct(self, x): # middle h = hs[-1] + ''' + hs = self.conv_in(x) + for i_level in range(self.num_resolutions): + for i_block in range(self.num_res_blocks): + h = self.down[i_level].block[i_block](hs, temb) + if len(self.down[i_level].attn) > 0: + h = self.down[i_level].attn[i_block](h) + hs = h + if i_level != self.num_resolutions - 1: + hs = self.down[i_level].downsample(hs) + + # middle + h = hs h = self.mid.block_1(h, temb) h = self.mid.attn_1(h) h = self.mid.block_2(h, temb) From c444d622ef40d81ba157cab7dbfd7254532b26e1 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Tue, 3 Sep 2024 15:50:47 +0800 Subject: [PATCH 15/32] x-3: use self-impl repeat interleave --- .../opensora/models/layers/operation_selector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/opensora_hpcai/opensora/models/layers/operation_selector.py b/examples/opensora_hpcai/opensora/models/layers/operation_selector.py index b50c7f655d..3ca748c00c 100644 --- a/examples/opensora_hpcai/opensora/models/layers/operation_selector.py +++ b/examples/opensora_hpcai/opensora/models/layers/operation_selector.py @@ -56,7 +56,8 @@ def get_repeat_interleave_op(): # provide better performance for static shape in graph mode return ops.repeat_interleave else: - return repeat_interleave_ext + # FIXME: check overflow + return repeat_interleave_ext_v2 def get_chunk_op(): From f59d16649ed352d421395dff57b4e0ee481488fe Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Tue, 3 Sep 2024 15:56:09 +0800 Subject: [PATCH 16/32] fix layernorm --- examples/opensora_hpcai/opensora/models/layers/blocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/opensora_hpcai/opensora/models/layers/blocks.py b/examples/opensora_hpcai/opensora/models/layers/blocks.py index f7f994e09a..c4b17422c8 100644 --- a/examples/opensora_hpcai/opensora/models/layers/blocks.py +++ b/examples/opensora_hpcai/opensora/models/layers/blocks.py @@ -329,7 +329,7 @@ def __init__(self, normalized_shape, eps=1e-5, elementwise_affine: bool = True, # self.layer_norm = ops.LayerNorm(-1, -1, epsilon=eps) def construct(self, x: Tensor): - x, _, _ = self.layer_norm(x, self.gamma, self.beta) + # x, _, _ = self.layer_norm(x, self.gamma, self.beta) normalized_shape = x.shape[-1:] x = mint.nn.functional.layer_norm(x, normalized_shape, self.gamma, self.beta, self.eps) From 4594f9cb741dde93cb617ff5ca49df8c64efa2f7 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 5 Sep 2024 11:16:03 +0800 Subject: [PATCH 17/32] record shape --- examples/opensora_hpcai/scripts/train.py | 6 +++--- mindone/trainers/recorder.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/opensora_hpcai/scripts/train.py b/examples/opensora_hpcai/scripts/train.py index d86b9104a8..8be1da397f 100644 --- a/examples/opensora_hpcai/scripts/train.py +++ b/examples/opensora_hpcai/scripts/train.py @@ -821,7 +821,7 @@ def main(args): ckpt_manager = CheckpointManager(ckpt_dir, "latest_k", k=args.ckpt_max_keep) if not os.path.exists(ckpt_dir): os.makedirs(ckpt_dir) - perf_columns = ["step", "loss", "train_time(s)"] + perf_columns = ["step", "loss", "train_time(s)", "shape"] output_dir = ckpt_dir.replace("/ckpt", "") if start_epoch == 0: record = PerfRecorder(output_dir, metric_names=perf_columns) @@ -846,13 +846,13 @@ def main(args): # print(data[0].shape) loss_val = float(loss.asnumpy()) logger.info( - f"Epoch {epoch}, Step {step}, loss {loss_val:.5f}, Global step {global_step}, Step time {step_time*1000:.2f}ms" + f"Epoch {epoch}, Step {step}, loss {loss_val:.5f}, Global step {global_step}, Shape: {tuple(data[0].shape)}, Step time {step_time*1000:.2f}ms" ) if overflow: logger.warning("overflow detected") if rank_id == 0: - step_pref_value = [global_step, loss_val, step_time] + step_pref_value = [global_step, loss_val, step_time, tuple(data[0].shape)] record.add(*step_pref_value) # save and eval in step if save_by_step and rank_id == 0: diff --git a/mindone/trainers/recorder.py b/mindone/trainers/recorder.py index ce43ce98e9..61e509d18c 100644 --- a/mindone/trainers/recorder.py +++ b/mindone/trainers/recorder.py @@ -42,6 +42,8 @@ def add(self, step, *measures): if isinstance(m, float) or isinstance(m, np.ndarray): line += f"{m:.7f}" + elif isinstance(m, tuple): + line += f"{m}" elif m is None: line += "NA" else: From 8855f4e52ec69ea79a1d5ad8360d591b34490770 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 5 Sep 2024 11:44:07 +0800 Subject: [PATCH 18/32] balance bucket config for A+M --- .../opensora-v1-2/train/train_stage2_ms.yaml | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml diff --git a/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml b/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml new file mode 100644 index 0000000000..50740733b5 --- /dev/null +++ b/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml @@ -0,0 +1,84 @@ +# model +model_version: v1.2 +pretrained_model_path: PATH_TO_YOUR_MODEL +model_max_length: 300 +freeze_y_embedder: True + +noise_scheduler: rflow +sample_method: logit-normal +use_timestep_transform: True + +vae_type: OpenSoraVAE_V1_2 +vae_checkpoint: models/OpenSora-VAE-v1.2/model.ckpt +vae_dtype: bf16 +vae_micro_batch_size: 4 +vae_micro_frame_size: 17 # keep it unchanged for the best results + +enable_flash_attention: True +use_recompute: True + +# data +num_parallel_workers: 2 +num_workers_dataset: 2 +prefetch_size: 2 +max_rowsize: 256 + +# mindspore params, refer to https://www.mindspore.cn/docs/zh-CN/r2.3.1/api_python/mindspore/mindspore.set_context.html +max_device_memory: "59GB" +jit_level: "O1" +manual_pad: True + +# precision +amp_level: "O2" +dtype: bf16 +loss_scaler_type: static +init_loss_scale: 1 + +# training hyper-params +scheduler: "constant" +start_learning_rate: 1.e-4 +end_learning_rate: 1.e-4 +# warmup_steps: 1000 + +clip_grad: True +max_grad_norm: 1.0 +use_ema: True +# ema_decay: 0.99 # default 0.9999 gives better result in our experiments + +optim: "adamw_re" +optim_eps: 1e-15 +weight_decay: 0. + +# epochs: 2 +train_steps: 23000 +ckpt_save_steps: 500 + +mask_ratios: + random: 0.005 + interpolate: 0.002 + quarter_random: 0.007 + quarter_head: 0.002 + quarter_tail: 0.002 + quarter_head_tail: 0.002 + image_random: 0.0 + image_head: 0.22 + image_tail: 0.005 + image_head_tail: 0.005 + +bucket_config: + # Structure: "resolution": { num_frames: [ keep_prob, batch_size ] } + # Setting [ keep_prob, batch_size ] to [ 0.0, 0 ] forces longer videos into smaller resolution buckets + "144p": { 1: [ 1.0, 475 ], 51: [ 1.0, 40 ], 102: [ [ 1.0, 0.33 ], 20 ], 204: [ [ 1.0, 0.1 ], 10 ], 408: [ [ 1.0, 0.1 ], 6 ] } + "256": { 1: [ 0.4, 297 ], 51: [ 0.5, 20 ], 102: [ [ 0.5, 0.33 ], 10 ], 204: [ [ 0.5, 1.0 ], 5 ], 408: [ [ 0.5, 1.0 ], 2 ] } + "240p": { 1: [ 0.3, 297 ], 51: [ 0.4, 16 ], 102: [ [ 0.4, 0.33 ], 8 ], 204: [ [ 0.4, 1.0 ], 4 ], 408: [ [ 0.4, 1.0 ], 2 ] } + "360p": { 1: [ 0.5, 141 ], 51: [ 0.15, 6 ], 102: [ [ 0.3, 0.5 ], 3 ], 204: [ [ 0.3, 1.0 ], 2 ], 408: [ [ 0.5, 0.5 ], 1 ] } + "512": { 1: [ 0.4, 141 ], 51: [ 0.15, 6 ], 102: [ [ 0.2, 0.4 ], 3 ], 204: [ [ 0.2, 1.0 ], 1 ], 408: [ [ 0.4, 0.5 ], 1 ] } + "480p": { 1: [ 0.5, 89 ], 51: [ 0.2, 5 ], 102: [ 0.2, 2 ], 204: [ 0.1, 1 ] } + "720p": { 1: [ 0.1, 36 ], 51: [ 0.03, 1 ] } + "1024": { 1: [ 0.1, 36 ], 51: [ 0.02, 1 ] } + "1080p": { 1: [ 0.01, 5 ] } + "2048": { 1: [ 0.01, 5 ] } + + +# ---------- Validation ---------- +validate: False From 560cd6754ded09eda55143240b23ad108c46e479 Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 5 Sep 2024 14:44:32 +0800 Subject: [PATCH 19/32] revert repeat interleave for safe --- .../opensora/models/layers/operation_selector.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/opensora_hpcai/opensora/models/layers/operation_selector.py b/examples/opensora_hpcai/opensora/models/layers/operation_selector.py index 3ca748c00c..269f3966c3 100644 --- a/examples/opensora_hpcai/opensora/models/layers/operation_selector.py +++ b/examples/opensora_hpcai/opensora/models/layers/operation_selector.py @@ -56,8 +56,9 @@ def get_repeat_interleave_op(): # provide better performance for static shape in graph mode return ops.repeat_interleave else: - # FIXME: check overflow - return repeat_interleave_ext_v2 + # FIXME: check overflow for v2 + # return repeat_interleave_ext_v2 + return repeat_interleave_ext def get_chunk_op(): From 1f72aa5e72f0c51ae39200052328d9636362c44f Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 5 Sep 2024 14:54:23 +0800 Subject: [PATCH 20/32] increase bs for 256 res --- .../configs/opensora-v1-2/train/train_stage2_ms.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml b/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml index 50740733b5..b5f3c6bc59 100644 --- a/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml +++ b/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage2_ms.yaml @@ -69,7 +69,7 @@ bucket_config: # Structure: "resolution": { num_frames: [ keep_prob, batch_size ] } # Setting [ keep_prob, batch_size ] to [ 0.0, 0 ] forces longer videos into smaller resolution buckets "144p": { 1: [ 1.0, 475 ], 51: [ 1.0, 40 ], 102: [ [ 1.0, 0.33 ], 20 ], 204: [ [ 1.0, 0.1 ], 10 ], 408: [ [ 1.0, 0.1 ], 6 ] } - "256": { 1: [ 0.4, 297 ], 51: [ 0.5, 20 ], 102: [ [ 0.5, 0.33 ], 10 ], 204: [ [ 0.5, 1.0 ], 5 ], 408: [ [ 0.5, 1.0 ], 2 ] } + "256": { 1: [ 0.4, 297 ], 51: [ 0.5, 24 ], 102: [ [ 0.5, 0.33 ], 12 ], 204: [ [ 0.5, 1.0 ], 6 ], 408: [ [ 0.5, 1.0 ], 2 ] } "240p": { 1: [ 0.3, 297 ], 51: [ 0.4, 16 ], 102: [ [ 0.4, 0.33 ], 8 ], 204: [ [ 0.4, 1.0 ], 4 ], 408: [ [ 0.4, 1.0 ], 2 ] } "360p": { 1: [ 0.5, 141 ], 51: [ 0.15, 6 ], 102: [ [ 0.3, 0.5 ], 3 ], 204: [ [ 0.3, 1.0 ], 2 ], 408: [ [ 0.5, 0.5 ], 1 ] } "512": { 1: [ 0.4, 141 ], 51: [ 0.15, 6 ], 102: [ [ 0.2, 0.4 ], 3 ], 204: [ [ 0.2, 1.0 ], 1 ], 408: [ [ 0.4, 0.5 ], 1 ] } From b7579170299375f8d320d639287ff5882b659b0b Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 5 Sep 2024 15:53:20 +0800 Subject: [PATCH 21/32] add shape step time analysis script --- .../tools/analyze_buckt_result.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 examples/opensora_hpcai/tools/analyze_buckt_result.py diff --git a/examples/opensora_hpcai/tools/analyze_buckt_result.py b/examples/opensora_hpcai/tools/analyze_buckt_result.py new file mode 100644 index 0000000000..d5f144ed6f --- /dev/null +++ b/examples/opensora_hpcai/tools/analyze_buckt_result.py @@ -0,0 +1,40 @@ +import pandas as pd +import argparse + +# result_path = 'outputs/analyze_os1.2_stage2_vcg200_ms231/merged_result.log' + +def analyze(result_path, save_path): + warmup_steps = 50 + max_step_time = 100 # step time larger than this value will be dropped, considering checkpoint saving + + data = pd.read_csv(result_path, sep='\t') + + # filter warmup stage + data = data.iloc[warmup_steps-1:] + + # filter out outliers + data = data[data['train_time(s)'] Date: Mon, 9 Sep 2024 16:31:55 +0800 Subject: [PATCH 22/32] fix stop --- examples/opensora_hpcai/scripts/train.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/opensora_hpcai/scripts/train.py b/examples/opensora_hpcai/scripts/train.py index 8be1da397f..f1413a9a9f 100644 --- a/examples/opensora_hpcai/scripts/train.py +++ b/examples/opensora_hpcai/scripts/train.py @@ -733,11 +733,12 @@ def main(args): resume=args.resume, ) callbacks.extend([save_cb, rec_cb]) - if args.train_steps > 0: - callbacks.append(StopAtStepCallback(args.train_steps, global_step=cur_iter)) if args.profile: callbacks.append(ProfilerCallbackEpoch(2, 3, "./profile_data")) + if args.train_steps > 0: + callbacks.append(StopAtStepCallback(args.train_steps, global_step=cur_iter)) + # 5. log and save config if rank_id == 0: if vae is not None: From 94cbbb4a6ecb1e2f8bb2f13048dcfe96bfdb312f Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Mon, 9 Sep 2024 17:00:56 +0800 Subject: [PATCH 23/32] rm pdb --- .../opensora_hpcai/scripts/test_datasource.py | 882 ++++++++++++++++++ 1 file changed, 882 insertions(+) create mode 100644 examples/opensora_hpcai/scripts/test_datasource.py diff --git a/examples/opensora_hpcai/scripts/test_datasource.py b/examples/opensora_hpcai/scripts/test_datasource.py new file mode 100644 index 0000000000..f45ff442a7 --- /dev/null +++ b/examples/opensora_hpcai/scripts/test_datasource.py @@ -0,0 +1,882 @@ +""" +STDiT training script +""" +import datetime +import logging +import math +import os +import sys +import time +from typing import Optional, Tuple + +import yaml + +import mindspore as ms +from mindspore import nn +from mindspore._c_expression import reset_op_id +from mindspore.communication.management import get_group_size, get_rank, init +from mindspore.nn.wrap.loss_scale import DynamicLossScaleUpdateCell +from mindspore.train.callback import TimeMonitor + +__dir__ = os.path.dirname(os.path.abspath(__file__)) +mindone_lib_path = os.path.abspath(os.path.join(__dir__, "../../../")) +sys.path.insert(0, mindone_lib_path) +sys.path.insert(0, os.path.abspath(os.path.join(__dir__, ".."))) +from args_train import parse_args +from opensora.datasets.aspect import ASPECT_RATIOS, get_image_size +from opensora.models.layers.operation_selector import set_dynamic_mode +from opensora.models.stdit import STDiT2_XL_2, STDiT3_XL_2, STDiT_XL_2 +from opensora.models.vae.vae import SD_CONFIG, OpenSoraVAE_V1_2, VideoAutoencoderKL +from opensora.pipelines import ( + DiffusionWithLoss, + DiffusionWithLossFiTLike, + RFlowDiffusionWithLoss, + RFlowEvalDiffusionWithLoss, +) +from opensora.schedulers.iddpm import create_diffusion +from opensora.utils.amp import auto_mixed_precision +from opensora.utils.callbacks import EMAEvalSwapCallback, PerfRecorderCallback +from opensora.utils.ema import EMA, save_ema_ckpts +from opensora.utils.metrics import BucketLoss +from opensora.utils.model_utils import WHITELIST_OPS, Model +from opensora.utils.resume import flush_from_cache, get_resume_ckpt, get_resume_states, resume_train_net, save_train_net + +from mindone.trainers.callback import EvalSaveCallback, OverflowMonitor, ProfilerCallbackEpoch, StopAtStepCallback +from mindone.trainers.checkpoint import CheckpointManager +from mindone.trainers.lr_schedule import create_scheduler +from mindone.trainers.optim import create_optimizer +from mindone.trainers.recorder import PerfRecorder +from mindone.trainers.train_step import TrainOneStepWrapper +from mindone.utils.logger import set_logger +from mindone.utils.params import count_params +from mindone.utils.seed import set_random_seed + +os.environ["HCCL_CONNECT_TIMEOUT"] = "6000" +os.environ["MS_ASCEND_CHECK_OVERFLOW_MODE"] = "INFNAN_MODE" + +logger = logging.getLogger(__name__) + + +def init_env( + mode: int = ms.GRAPH_MODE, + seed: int = 42, + distributed: bool = False, + max_device_memory: str = None, + device_target: str = "Ascend", + parallel_mode: str = "data", + jit_level: str = "O0", + global_bf16: bool = False, + dynamic_shape: bool = False, + debug: bool = False, +) -> Tuple[int, int]: + """ + Initialize MindSpore environment. + + Args: + mode: MindSpore execution mode. Default is 0 (ms.GRAPH_MODE). + seed: The seed value for reproducibility. Default is 42. + distributed: Whether to enable distributed training. Default is False. + Returns: + A tuple containing the device ID, rank ID and number of devices. + """ + set_random_seed(seed) + + if debug and mode == ms.GRAPH_MODE: # force PyNative mode when debugging + logger.warning("Debug mode is on, switching execution mode to PyNative.") + mode = ms.PYNATIVE_MODE + + if max_device_memory is not None: + ms.set_context(max_device_memory=max_device_memory) + + if distributed: + ms.set_context( + mode=mode, + device_target=device_target, + ) + if parallel_mode == "optim": + print("use optim parallel") + ms.set_auto_parallel_context( + parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, + enable_parallel_optimizer=True, + ) + init() + device_num = get_group_size() + rank_id = get_rank() + else: + init() + device_num = get_group_size() + rank_id = get_rank() + logger.debug(f"rank_id: {rank_id}, device_num: {device_num}") + ms.reset_auto_parallel_context() + + ms.set_auto_parallel_context( + parallel_mode=ms.ParallelMode.DATA_PARALLEL, + gradients_mean=True, + device_num=device_num, + ) + + var_info = ["device_num", "rank_id", "device_num / 8", "rank_id / 8"] + var_value = [device_num, rank_id, int(device_num / 8), int(rank_id / 8)] + logger.info(dict(zip(var_info, var_value))) + + else: + device_num = 1 + rank_id = 0 + ms.set_context( + mode=mode, + device_target=device_target, + pynative_synchronize=debug, + ) + + try: + if jit_level in ["O0", "O1", "O2"]: + ms.set_context(jit_config={"jit_level": jit_level}) + else: + logger.warning( + f"Unsupported jit_level: {jit_level}. The framework will automatically select the execution mode." + ) + except Exception: + logger.warning( + "The current jit_level is not suitable because current MindSpore version or mode does not match," + "please ensure the MindSpore version >= ms2.3_0615, and use GRAPH_MODE." + ) + + if global_bf16: + ms.set_context(ascend_config={"precision_mode": "allow_mix_precision_bf16"}) + + if dynamic_shape: + logger.info("Dynamic shape mode enabled, repeat_interleave/split/chunk will be called from mint module") + set_dynamic_mode(True) + if mode == 0: + # FIXME: this is a temp fix for dynamic shape training in graph mode. may remove in future version. + # can append adamw fusion flag if use nn.AdamW optimzation for acceleration + ms.set_context(graph_kernel_flags="--disable_packet_ops=Reshape") + + return rank_id, device_num + + +def set_all_reduce_fusion( + params, + split_num: int = 7, + distributed: bool = False, + parallel_mode: str = "data", +) -> None: + """Set allreduce fusion strategy by split_num.""" + + if distributed and parallel_mode == "data": + all_params_num = len(params) + step = all_params_num // split_num + split_list = [i * step for i in range(1, split_num)] + split_list.append(all_params_num - 1) + logger.info(f"Distribute config set: dall_params_num: {all_params_num}, set all_reduce_fusion: {split_list}") + ms.set_auto_parallel_context(all_reduce_fusion_config=split_list) + + +def initialize_dataset( + args, + csv_path, + video_folder, + text_embed_folder, + vae_latent_folder, + batch_size, + img_h, + img_w, + latte_model, + vae, + bucket_config: Optional[dict] = None, + validation: bool = False, + device_num: int = 1, + rank_id: int = 0, +): + if args.model_version == "v1": + from opensora.datasets.t2v_dataset import create_dataloader + + ds_config = dict( + csv_path=csv_path, + video_folder=video_folder, + text_emb_folder=text_embed_folder, + return_text_emb=True, + vae_latent_folder=vae_latent_folder, + return_vae_latent=args.train_with_vae_latent, + vae_scale_factor=args.sd_scale_factor, + sample_size=img_w, # img_w == img_h + sample_stride=args.frame_stride, + sample_n_frames=args.num_frames, + tokenizer=None, + video_column=args.video_column, + caption_column=args.caption_column, + disable_flip=args.disable_flip, + filter_data=args.filter_data, + ) + dataloader = create_dataloader( + ds_config, + batch_size=batch_size, + shuffle=True, + device_num=device_num, + rank_id=rank_id, + num_parallel_workers=args.num_parallel_workers, + max_rowsize=args.max_rowsize, + ) + num_src_samples = batch_size * dataloader.get_dataset_size() * (device_num if device_num is not None else 1) + + datasets = [0] + else: + from opensora.datasets.bucket import Bucket, bucket_split_function + from opensora.datasets.mask_generator import MaskGenerator + from opensora.datasets.video_dataset_refactored import VideoDatasetRefactored, create_dataloader + + # from mindone.data import create_dataloader + if validation: + mask_gen = MaskGenerator({"identity": 1.0}) + all_buckets, individual_buckets = None, [None] + if bucket_config is not None: + all_buckets = Bucket(bucket_config) + # Build a new bucket for each resolution and number of frames for the validation stage + individual_buckets = [ + Bucket({res: {num_frames: [1.0, bucket_config[res][num_frames][1]]}}) + for res in bucket_config.keys() + for num_frames in bucket_config[res].keys() + ] + else: + mask_gen = MaskGenerator(args.mask_ratios) + all_buckets = Bucket(bucket_config) if bucket_config is not None else None + individual_buckets = [all_buckets] + + # output_columns=["video", "caption", "mask", "fps", "num_frames", "frames_mask"], + output_columns = ["video", "caption", "mask", "frames_mask", "num_frames", "height", "width", "fps", "ar"] + if args.pre_patchify: + output_columns.extend(["spatial_pos", "spatial_mask", "temporal_pos", "temporal_mask"]) + + datasets = [ + VideoDatasetRefactored( + csv_path=csv_path, + video_folder=video_folder, + text_emb_folder=text_embed_folder, + vae_latent_folder=vae_latent_folder, + vae_scale_factor=args.sd_scale_factor, + sample_n_frames=args.num_frames, + sample_stride=args.frame_stride, + frames_mask_generator=mask_gen, + t_compress_func=lambda x: vae.get_latent_size((x, None, None))[0], + buckets=buckets, + filter_data=args.filter_data, + pre_patchify=args.pre_patchify, + patch_size=latte_model.patch_size, + embed_dim=latte_model.hidden_size, + num_heads=latte_model.num_heads, + max_target_size=args.max_image_size, + input_sq_size=latte_model.input_sq_size, + in_channels=latte_model.in_channels, + apply_train_transforms=True, + target_size=(img_h, img_w), + video_backend=args.video_backend, + output_columns=output_columns, + ) + for buckets in individual_buckets + ] + + num_src_samples = sum([len(ds) for ds in datasets]) + + dataloaders = [ + create_dataloader( + dataset, + batch_size=batch_size if all_buckets is None else 0, # Turn off batching if using buckets + shuffle=not validation, + device_num=device_num, + rank_id=rank_id, + num_parallel_workers=args.num_parallel_workers, + drop_remainder=not validation, + prefetch_size=args.prefetch_size, + max_rowsize=args.max_rowsize, + debug=args.debug, + ) + for dataset in datasets + ] + dataloader = ms.dataset.ConcatDataset(dataloaders) if len(dataloaders) > 1 else dataloaders[0] + + if all_buckets is not None: + hash_func, bucket_boundaries, bucket_batch_sizes = bucket_split_function(all_buckets) + dataloader = dataloader.bucket_batch_by_length( + ["video"], + bucket_boundaries, + bucket_batch_sizes, + element_length_function=hash_func, + drop_remainder=not validation, + ) + return dataloader, num_src_samples, datasets[0] + + +def main(args): + if args.add_datetime: + time_str = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") + args.output_path = os.path.join(args.output_path, time_str) + + # 1. init + rank_id, device_num = init_env( + args.mode, + seed=args.seed, + distributed=args.use_parallel, + device_target=args.device_target, + max_device_memory=args.max_device_memory, + parallel_mode=args.parallel_mode, + jit_level=args.jit_level, + global_bf16=args.global_bf16, + dynamic_shape=(args.bucket_config is not None), + debug=args.debug, + ) + set_logger(name="", output_dir=args.output_path, rank=rank_id, log_level=eval(args.log_level)) + + # 2. model initiate and weight loading + dtype_map = {"fp16": ms.float16, "bf16": ms.bfloat16} + + img_h, img_w = None, None + if args.pre_patchify: + img_h, img_w = args.max_image_size, args.max_image_size + elif args.image_size is not None: + img_h, img_w = args.image_size if isinstance(args.image_size, list) else (args.image_size, args.image_size) + elif args.bucket_config is None: + if args.resolution is None or args.aspect_ratio is None: + raise ValueError( + "`resolution` and `aspect_ratio` must be provided if `image_size` or `bucket_config` are not provided" + ) + img_h, img_w = get_image_size(args.resolution, args.aspect_ratio) + + if args.model_version == "v1": + assert img_h == img_w, "OpenSora v1 support square images only." + + # 2.1 vae + logger.info("vae init") + train_with_vae_latent = args.vae_latent_folder is not None and os.path.exists(args.vae_latent_folder) + if not train_with_vae_latent: + if args.vae_type in [None, "VideoAutoencoderKL"]: + vae = VideoAutoencoderKL( + config=SD_CONFIG, ckpt_path=args.vae_checkpoint, micro_batch_size=args.vae_micro_batch_size + ) + elif args.vae_type == "OpenSoraVAE_V1_2": + if args.vae_micro_frame_size != 17: + logger.warning("vae_micro_frame_size should be 17 to align with the vae pretrain setting.") + vae = OpenSoraVAE_V1_2( + micro_batch_size=args.vae_micro_batch_size, + micro_frame_size=args.vae_micro_frame_size, + ckpt_path=args.vae_checkpoint, + freeze_vae_2d=True, + ) + else: + raise ValueError(f"Unknown VAE type: {args.vae_type}") + vae = vae.set_train(False) + + for param in vae.get_parameters(): + param.requires_grad = False + if args.vae_param_dtype in ["fp16", "bf16"]: + # filter out norm + if "norm" not in param.name: + param.set_dtype(dtype_map[args.vae_param_dtype]) + + if args.vae_dtype in ["fp16", "bf16"]: + vae = auto_mixed_precision( + vae, + amp_level=args.vae_amp_level, + dtype=dtype_map[args.vae_dtype], + custom_fp32_cells=[nn.GroupNorm] if args.vae_keep_gn_fp32 else [], + ) + + # infer latent size + VAE_Z_CH = vae.out_channels + latent_size = vae.get_latent_size((args.num_frames, img_h, img_w)) + else: + # vae cache + vae = None + assert args.vae_type != "OpenSoraVAE_V1_2", "vae cache is not supported with 3D VAE currently." + VAE_Z_CH = SD_CONFIG["z_channels"] + VAE_T_COMPRESS = 1 + VAE_S_COMPRESS = 8 + latent_size = (args.num_frames // VAE_T_COMPRESS, img_h // VAE_S_COMPRESS, img_w // VAE_S_COMPRESS) + + # 2.2 stdit + if args.model_version == "v1": + assert img_h == img_w, "OpenSora v1 support square images only." + + patchify_conv3d_replace = "linear" if args.pre_patchify else args.patchify + model_extra_args = dict( + input_size=latent_size, + in_channels=VAE_Z_CH, + model_max_length=args.model_max_length, + patchify_conv3d_replace=patchify_conv3d_replace, # for Ascend + manual_pad=args.manual_pad, + enable_flashattn=args.enable_flash_attention, + use_recompute=args.use_recompute, + num_recompute_blocks=args.num_recompute_blocks, + ) + + if args.pre_patchify and args.model_version != "v1.1": + raise ValueError("`pre_patchify=True` can only be used in model version 1.1.") + + if args.model_version == "v1": + model_name = "STDiT" + model_extra_args.update( + { + "space_scale": args.space_scale, # 0.5 for 256x256. 1. for 512 + "time_scale": args.time_scale, + } + ) + latte_model = STDiT_XL_2(**model_extra_args) + elif args.model_version == "v1.1": + model_name = "STDiT2" + model_extra_args.update({"input_sq_size": 512, "qk_norm": True}) + latte_model = STDiT2_XL_2(**model_extra_args) + elif args.model_version == "v1.2": + model_name = "STDiT3" + model_extra_args["qk_norm"] = True + model_extra_args["freeze_y_embedder"] = args.freeze_y_embedder + latte_model = STDiT3_XL_2(**model_extra_args) + else: + raise ValueError(f"Unknown model version: {args.model_version}") + logger.info(f"{model_name} input size: {latent_size if args.bucket_config is None else 'Variable'}") + + # mixed precision + if args.dtype in ["fp16", "bf16"]: + if not args.global_bf16: + latte_model = auto_mixed_precision( + latte_model, + amp_level=args.amp_level, + dtype=dtype_map[args.dtype], + custom_fp32_cells=WHITELIST_OPS, + ) + # load checkpoint + if len(args.pretrained_model_path) > 0: + logger.info(f"Loading ckpt {args.pretrained_model_path}...") + latte_model.load_from_checkpoint(args.pretrained_model_path) + else: + logger.info("Use random initialization for Latte") + latte_model.set_train(True) + + if (latent_size[1] and latent_size[1] % latte_model.patch_size[1]) or ( + latent_size[2] and latent_size[2] % latte_model.patch_size[2] + ): + height_ = latte_model.patch_size[1] * 8 # FIXME + width_ = latte_model.patch_size[2] * 8 # FIXME + msg = f"Image height ({img_h}) and width ({img_w}) should be divisible by {height_} and {width_} respectively." + if patchify_conv3d_replace == "linear": + raise ValueError(msg) + else: + logger.warning(msg) + + # 2.3 ldm with loss + logger.info(f"Train with vae latent cache: {train_with_vae_latent}") + diffusion = create_diffusion(timestep_respacing="") + latent_diffusion_eval, metrics = None, {} + pipeline_kwargs = dict( + scale_factor=args.sd_scale_factor, + cond_stage_trainable=False, + text_emb_cached=True, + video_emb_cached=train_with_vae_latent, + ) + if args.noise_scheduler.lower() == "ddpm": + if args.validate: + logger.warning( + "Validation is supported with Rectified Flow noise scheduler only. No validation will be performed." + ) + if args.pre_patchify: + additional_pipeline_kwargs = dict( + patch_size=latte_model.patch_size, + max_image_size=args.max_image_size, + vae_downsample_rate=8.0, + in_channels=latte_model.in_channels, + ) + pipeline_kwargs.update(additional_pipeline_kwargs) + pipeline_ = DiffusionWithLossFiTLike + else: + pipeline_ = DiffusionWithLoss + elif args.noise_scheduler.lower() == "rflow": + if args.validate: + if args.val_bucket_config is None: + metrics = {"Validation loss": BucketLoss(str((img_h, img_w)), {(img_h, img_w)}, args.num_frames)} + else: + metrics = { + f"Validation loss {res}x{frames}": BucketLoss(res, set(ASPECT_RATIOS[res][1].values()), frames) + for res, val in args.val_bucket_config.items() + for frames in val.keys() + } + latent_diffusion_eval = RFlowEvalDiffusionWithLoss( + latte_model, + diffusion, + num_eval_timesteps=args.num_eval_timesteps, + vae=vae, + text_encoder=None, + **pipeline_kwargs, + ) + pipeline_kwargs.update( + dict(sample_method=args.sample_method, use_timestep_transform=args.use_timestep_transform) + ) + pipeline_ = RFlowDiffusionWithLoss + else: + raise ValueError(f"Unknown noise scheduler: {args.noise_scheduler}") + + latent_diffusion_with_loss = pipeline_(latte_model, diffusion, vae=vae, text_encoder=None, **pipeline_kwargs) + + # 3. create dataset + dataloader, num_src_samples, src_dataset = initialize_dataset( + args, + args.csv_path, + args.video_folder, + args.text_embed_folder, + args.vae_latent_folder, + args.batch_size, + img_h, + img_w, + latte_model, + vae, + bucket_config=args.bucket_config, + device_num=device_num, + rank_id=rank_id, + ) + + # FIXME: get_dataset_size() is extremely slow when used with bucket_batch_by_length + if args.bucket_config is None: + dataset_size = dataloader.get_dataset_size() + else: + # steps per epoch is not constant in bucket config training + # FIXME: It is a highly relaxed estimation to ensure enough steps per epoch to sustain training. \ + # A more precise estimation or run-time infer is to be implemented. + dataset_size = math.ceil(num_src_samples / device_num) + dataloader.dataset_size = dataset_size + logger.warning( + f"Manually set dataset_size to {dataset_size} to skip get_dataset_size() for bucket config training." + ) + + val_dataloader = None + if args.validate: + val_dataloader = initialize_dataset( + args, + args.val_csv_path, + args.val_video_folder, + args.val_text_embed_folder, + args.val_vae_latent_folder, + args.val_batch_size, + img_h, + img_w, + latte_model, + vae, + bucket_config=args.val_bucket_config, + validation=True, + device_num=device_num, + rank_id=rank_id, + ) + + # compute total steps and data epochs (in unit of data sink size) + if args.dataset_sink_mode and args.sink_size != -1: + steps_per_sink = args.sink_size + else: + steps_per_sink = dataset_size + + if args.train_steps == -1: + assert args.epochs != -1 + total_train_steps = args.epochs * dataset_size + sink_epochs = math.ceil(total_train_steps / steps_per_sink) + else: + total_train_steps = args.train_steps + # asume one step need one whole epoch data to ensure enough batch loading for training + sink_epochs = total_train_steps + + if args.ckpt_save_steps == -1: + ckpt_save_interval = args.ckpt_save_interval + step_mode = False + else: + step_mode = not args.dataset_sink_mode + if not args.dataset_sink_mode: + ckpt_save_interval = args.ckpt_save_steps + else: + # still need to count interval in sink epochs + ckpt_save_interval = max(1, args.ckpt_save_steps // steps_per_sink) + if args.ckpt_save_steps % steps_per_sink != 0: + logger.warning( + f"`ckpt_save_steps` must be times of sink size or dataset_size under dataset sink mode." + f"Checkpoint will be saved every {ckpt_save_interval * steps_per_sink} steps." + ) + step_mode = step_mode if args.step_mode is None else args.step_mode + + logger.info(f"train_steps: {total_train_steps}, train_epochs: {args.epochs}, sink_size: {args.sink_size}") + logger.info(f"total train steps: {total_train_steps}, sink epochs: {sink_epochs}") + logger.info( + "ckpt_save_interval: {} {}".format( + ckpt_save_interval, "steps" if (not args.dataset_sink_mode and step_mode) else "sink epochs" + ) + ) + + # 4. build training utils: lr, optim, callbacks, trainer + # build learning rate scheduler + if not args.decay_steps: + args.decay_steps = total_train_steps - args.warmup_steps # fix lr scheduling + if args.decay_steps <= 0: + logger.warning( + f"decay_steps is {args.decay_steps}, please check epochs, dataset_size and warmup_steps. " + f"Will force decay_steps to be set to 1." + ) + args.decay_steps = 1 + + lr = create_scheduler( + steps_per_epoch=dataset_size, # not used + name=args.scheduler, + lr=args.start_learning_rate, + end_lr=args.end_learning_rate, + warmup_steps=args.warmup_steps, + decay_steps=args.decay_steps, + total_steps=total_train_steps, + ) + + set_all_reduce_fusion( + latent_diffusion_with_loss.trainable_params(), + split_num=7, + distributed=args.use_parallel, + parallel_mode=args.parallel_mode, + ) + + # build optimizer + optimizer = create_optimizer( + latent_diffusion_with_loss.trainable_params(), + name=args.optim, + betas=args.betas, + eps=args.optim_eps, + group_strategy=args.group_strategy, + weight_decay=args.weight_decay, + lr=lr, + ) + + if args.loss_scaler_type == "dynamic": + loss_scaler = DynamicLossScaleUpdateCell( + loss_scale_value=args.init_loss_scale, scale_factor=args.loss_scale_factor, scale_window=args.scale_window + ) + elif args.loss_scaler_type == "static": + loss_scaler = nn.FixedLossScaleUpdateCell(args.init_loss_scale) + else: + raise ValueError + + # resume ckpt + ckpt_dir = os.path.join(args.output_path, "ckpt") + start_epoch = 0 + cur_iter = 0 + if args.resume: + resume_ckpt = get_resume_ckpt(args.resume, args.output_path) + if resume_ckpt is not None: + start_epoch, cur_iter, loss_scale = get_resume_states(resume_ckpt) + loss_scaler.loss_scale_value = loss_scale + logger.info(f"Resumed loss_scaler, prev epoch: {start_epoch}, global step {cur_iter}") + + # trainer (standalone and distributed) + ema = EMA(latent_diffusion_with_loss.network, ema_decay=args.ema_decay, offloading=True) if args.use_ema else None + + net_with_grads = TrainOneStepWrapper( + latent_diffusion_with_loss, + optimizer=optimizer, + scale_sense=loss_scaler, + drop_overflow_update=args.drop_overflow_update, + gradient_accumulation_steps=args.gradient_accumulation_steps, + clip_grad=args.clip_grad, + clip_norm=args.max_grad_norm, + ema=ema, + ) + + # resume train net states + if args.resume and resume_ckpt is not None: + resume_train_net(net_with_grads, resume_ckpt) + + if (args.mode == 0) and (args.bucket_config is not None): + video = ms.Tensor(shape=[None, None, 3, None, None], dtype=ms.float32) + caption = ms.Tensor(shape=[None, args.model_max_length, 4096], dtype=ms.float32) + mask = ms.Tensor(shape=[None, args.model_max_length], dtype=ms.uint8) + frames_mask = ms.Tensor(shape=[None, None], dtype=ms.bool_) + # fmt: off + num_frames = ms.Tensor(shape=[None, ], dtype=ms.float32) + height = ms.Tensor(shape=[None, ], dtype=ms.float32) + width = ms.Tensor(shape=[None, ], dtype=ms.float32) + fps = ms.Tensor(shape=[None, ], dtype=ms.float32) + ar = ms.Tensor(shape=[None, ], dtype=ms.float32) + # fmt: on + net_with_grads.set_inputs(video, caption, mask, frames_mask, num_frames, height, width, fps, ar) + logger.info("Dynamic inputs are initialized for bucket config training in Graph mode!") + + if not args.custom_train: + if args.global_bf16: + model = Model(net_with_grads, eval_network=latent_diffusion_eval, metrics=metrics, amp_level="O0") + else: + model = Model(net_with_grads, eval_network=latent_diffusion_eval, metrics=metrics) + + # callbacks + callbacks = [OverflowMonitor(), EMAEvalSwapCallback(ema)] + if args.bucket_config is None: + callbacks.append(TimeMonitor(args.log_interval)) + else: + logger.info( + "As steps per epoch are inaccurate with bucket config, TimeMonitor is disabled. See result.log for the actual step time" + ) + if rank_id == 0: + save_cb = EvalSaveCallback( + network=latent_diffusion_with_loss.network, + rank_id=rank_id, + ckpt_save_dir=ckpt_dir, + ema=ema, + save_ema_only=False, + ckpt_save_policy="latest_k", + ckpt_max_keep=args.ckpt_max_keep, + step_mode=step_mode, + use_step_unit=(args.ckpt_save_steps != -1), + ckpt_save_interval=ckpt_save_interval, + log_interval=args.log_interval, + start_epoch=start_epoch, + model_name=model_name, + resume_prefix_blacklist=["vae.", "swap."], + record_lr=False, + train_steps=args.train_steps, + ) + rec_cb = PerfRecorderCallback( + save_dir=args.output_path, + file_name="result_val.log", + metric_names=list(metrics.keys()), + resume=args.resume, + ) + callbacks.extend([save_cb, rec_cb]) + if args.profile: + callbacks.append(ProfilerCallbackEpoch(2, 3, "./profile_data")) + + if args.train_steps > 0: + callbacks.append(StopAtStepCallback(args.train_steps, global_step=cur_iter)) + + # 5. log and save config + if rank_id == 0: + if vae is not None: + num_params_vae, num_params_vae_trainable = count_params(vae) + else: + num_params_vae, num_params_vae_trainable = 0, 0 + num_params_latte, num_params_latte_trainable = count_params(latte_model) + num_params = num_params_vae + num_params_latte + num_params_trainable = num_params_vae_trainable + num_params_latte_trainable + key_info = "Key Settings:\n" + "=" * 50 + "\n" + key_info += "\n".join( + [ + f"MindSpore mode[GRAPH(0)/PYNATIVE(1)]: {args.mode}", + f"Jit level: {args.jit_level}", + f"Distributed mode: {args.use_parallel}", + f"Num params: {num_params:,} (latte: {num_params_latte:,}, vae: {num_params_vae:,})", + f"Num trainable params: {num_params_trainable:,}", + f"{model_name} dtype: {args.dtype}", + f"VAE dtype: {args.vae_dtype}", + f"Learning rate: {args.start_learning_rate}", + f"Batch size: {args.batch_size if args.bucket_config is None else 'Variable'}", + f"Image size: {(img_h, img_w) if args.bucket_config is None else 'Variable'}", + f"Frames: {args.num_frames if args.bucket_config is None else 'Variable'}", + f"Latent size: {latent_size if args.bucket_config is None else 'Variable'}", + f"Weight decay: {args.weight_decay}", + f"Grad accumulation steps: {args.gradient_accumulation_steps}", + f"Num epochs: {args.epochs}", + f"Loss scaler: {args.loss_scaler_type}", + f"Init loss scale: {args.init_loss_scale}", + f"Grad clipping: {args.clip_grad}", + f"Max grad norm: {args.max_grad_norm}", + f"EMA: {args.use_ema}", + f"Enable flash attention: {args.enable_flash_attention}", + f"Use recompute: {args.use_recompute}", + f"Dataset sink: {args.dataset_sink_mode}", + f"Resume training: {args.resume}", + ] + ) + key_info += "\n" + "=" * 50 + logger.info(key_info) + + logger.info("Start training...") + + with open(os.path.join(args.output_path, "args.yaml"), "w") as f: + yaml.safe_dump(vars(args), stream=f, default_flow_style=False, sort_keys=False) + + # 6. train + if not args.custom_train: + model.fit( + sink_epochs, + dataloader, + valid_dataset=val_dataloader, + valid_frequency=args.val_interval, + callbacks=callbacks, + dataset_sink_mode=args.dataset_sink_mode, + valid_dataset_sink_mode=False, # TODO: add support? + sink_size=args.sink_size, + initial_epoch=start_epoch, + ) + + else: + assert not args.dataset_sink_mode, "data sink not supported for custom train process currently" + + # re-count training steps and epochs + if args.train_steps > 0: + # ensure num_epochs >= train_steps/steps_per_epoch, but steps_per_epoch is uncertain with dynamic BS, the safest bound is to assume it to be 1. + # Note that it's not the actual data epochs that will be run. Training process will terminate in train_steps + num_epochs = args.train_steps + else: + assert args.epochs > 0, "args.epochs must be given and > 0 if train_steps is not specified" + # the actual data epochs to be run in this case + num_epochs = args.epochs + global_step = cur_iter # index start from 1 (after first-step network update) + + if args.ckpt_save_steps > 0: + save_by_step = True + else: + save_by_step = False + + if rank_id == 0: + ckpt_manager = CheckpointManager(ckpt_dir, "latest_k", k=args.ckpt_max_keep) + if not os.path.exists(ckpt_dir): + os.makedirs(ckpt_dir) + perf_columns = ["step", "loss", "train_time(s)", "shape"] + output_dir = ckpt_dir.replace("/ckpt", "") + if start_epoch == 0: + record = PerfRecorder(output_dir, metric_names=perf_columns) + else: + record = PerfRecorder(output_dir, resume=True) + + # ds_iter = dataloader.create_tuple_iterator(num_epochs=num_epochs - start_epoch) + # ds_iter = dataloader.create_tuple_iterator(num_epochs=-1) # infinite + end_train = False + # import pdb; pdb.set_trace() + print(f"D--: start_epoch {start_epoch}, num_epochs {num_epochs}") + for epoch in range(start_epoch + 1, num_epochs + 1): + if (args.train_steps > 0) and (global_step >= args.train_steps): + logger.warning("resumed steps >= train_steps, will end training") + break + + start_time_s = time.time() + samples_per_epoch = num_src_samples + + for step in range(samples_per_epoch): + data = src_dataset.__getitem__(step)[0] + + # loss, overflow, scaling_sens = net_with_grads(*data) + global_step += 1 + step_time = time.time() - start_time_s + + # log + print(data[0].shape) + loss_val = 0 # float(loss.asnumpy()) + logger.info( + f"Epoch {epoch}, Step {step}, loss {loss_val:.5f}, Global step {global_step}, Shape: {tuple(data[0].shape)}, Step time {step_time*1000:.2f}ms" + ) + + if (args.train_steps > 0) and (global_step >= args.train_steps): + end_train = True + break + + start_time_s = time.time() + + # dataloader.reset() + # flush_from_cache(net_with_grads) + + if end_train: + break + + logger.info("Finished training. Ending process...") + reset_op_id() + time.sleep(60) + logger.info("End") + + +if __name__ == "__main__": + logger.debug("process id:", os.getpid()) + args = parse_args() + main(args) From 277410cf7f053c00f762608ef109dff080bbe56c Mon Sep 17 00:00:00 2001 From: samithuang <285365963@qq.com> Date: Thu, 12 Sep 2024 10:23:27 +0800 Subject: [PATCH 24/32] acc by add Symbol --- examples/opensora_hpcai/scripts/train.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/examples/opensora_hpcai/scripts/train.py b/examples/opensora_hpcai/scripts/train.py index f1413a9a9f..3d657f7c87 100644 --- a/examples/opensora_hpcai/scripts/train.py +++ b/examples/opensora_hpcai/scripts/train.py @@ -147,10 +147,10 @@ def init_env( if dynamic_shape: logger.info("Dynamic shape mode enabled, repeat_interleave/split/chunk will be called from mint module") set_dynamic_mode(True) - if mode == 0: + # if mode == 0: # FIXME: this is a temp fix for dynamic shape training in graph mode. may remove in future version. # can append adamw fusion flag if use nn.AdamW optimzation for acceleration - ms.set_context(graph_kernel_flags="--disable_packet_ops=Reshape") + # ms.set_context(graph_kernel_flags="--disable_packet_ops=Reshape") return rank_id, device_num @@ -679,16 +679,17 @@ def main(args): resume_train_net(net_with_grads, resume_ckpt) if (args.mode == 0) and (args.bucket_config is not None): - video = ms.Tensor(shape=[None, None, 3, None, None], dtype=ms.float32) - caption = ms.Tensor(shape=[None, args.model_max_length, 4096], dtype=ms.float32) - mask = ms.Tensor(shape=[None, args.model_max_length], dtype=ms.uint8) - frames_mask = ms.Tensor(shape=[None, None], dtype=ms.bool_) + _bs = ms.Symbol(unique=True) + video = ms.Tensor(shape=[_bs, None, 3, None, None], dtype=ms.float32) + caption = ms.Tensor(shape=[_bs, args.model_max_length, 4096], dtype=ms.float32) + mask = ms.Tensor(shape=[_bs, args.model_max_length], dtype=ms.uint8) + frames_mask = ms.Tensor(shape=[_bs, None], dtype=ms.bool_) # fmt: off - num_frames = ms.Tensor(shape=[None, ], dtype=ms.float32) - height = ms.Tensor(shape=[None, ], dtype=ms.float32) - width = ms.Tensor(shape=[None, ], dtype=ms.float32) - fps = ms.Tensor(shape=[None, ], dtype=ms.float32) - ar = ms.Tensor(shape=[None, ], dtype=ms.float32) + num_frames = ms.Tensor(shape=[_bs, ], dtype=ms.float32) + height = ms.Tensor(shape=[_bs, ], dtype=ms.float32) + width = ms.Tensor(shape=[_bs, ], dtype=ms.float32) + fps = ms.Tensor(shape=[_bs, ], dtype=ms.float32) + ar = ms.Tensor(shape=[_bs, ], dtype=ms.float32) # fmt: on net_with_grads.set_inputs(video, caption, mask, frames_mask, num_frames, height, width, fps, ar) logger.info("Dynamic inputs are initialized for bucket config training in Graph mode!") From 0a83a37d5e3245d217397c8fc1602f432a29268b Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Fri, 13 Sep 2024 10:07:31 +0800 Subject: [PATCH 25/32] clear mem in the end of epoch --- examples/opensora_hpcai/opensora/models/vae/modules.py | 4 ++-- examples/opensora_hpcai/scripts/train.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/opensora_hpcai/opensora/models/vae/modules.py b/examples/opensora_hpcai/opensora/models/vae/modules.py index 94fa68f7fc..83703faef6 100644 --- a/examples/opensora_hpcai/opensora/models/vae/modules.py +++ b/examples/opensora_hpcai/opensora/models/vae/modules.py @@ -276,7 +276,7 @@ def construct(self, x): temb = None # downsampling - ''' + """ hs = [self.conv_in(x)] for i_level in range(self.num_resolutions): for i_block in range(self.num_res_blocks): @@ -289,7 +289,7 @@ def construct(self, x): # middle h = hs[-1] - ''' + """ hs = self.conv_in(x) for i_level in range(self.num_resolutions): for i_block in range(self.num_res_blocks): diff --git a/examples/opensora_hpcai/scripts/train.py b/examples/opensora_hpcai/scripts/train.py index 3d657f7c87..347541ba43 100644 --- a/examples/opensora_hpcai/scripts/train.py +++ b/examples/opensora_hpcai/scripts/train.py @@ -299,7 +299,7 @@ def initialize_dataset( bucket_boundaries, bucket_batch_sizes, element_length_function=hash_func, - drop_remainder=not validation, + drop_remainder=False, ) return dataloader, num_src_samples From e706e2c462b67aabe145b6eb6aab282a07761b17 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Fri, 13 Sep 2024 14:59:18 +0800 Subject: [PATCH 26/32] update doc --- examples/opensora_hpcai/README.md | 4 +- .../scripts/run/run_train_os1.2_stage2.sh | 2 +- .../opensora_hpcai/scripts/test_datasource.py | 882 ------------------ examples/opensora_hpcai/scripts/train.py | 9 +- .../tools/analyze_buckt_result.py | 26 +- 5 files changed, 22 insertions(+), 901 deletions(-) delete mode 100644 examples/opensora_hpcai/scripts/test_datasource.py diff --git a/examples/opensora_hpcai/README.md b/examples/opensora_hpcai/README.md index 6b4bfd1bef..67f5f62776 100644 --- a/examples/opensora_hpcai/README.md +++ b/examples/opensora_hpcai/README.md @@ -517,9 +517,11 @@ bucket_config: "2048": { 1: [ 0.01, 5 ] } ``` +Knowing that the optimal bucket config can varies from device to device, we have tuned and provided bucket config that are more balanced on Ascend + MindSpore in `configs/opensora-v1-2/train/{stage}_ms.yaml`. You may use them for better training performance. + More details on the bucket configuration can be found in [Multi-resolution Training with Buckets](./docs/quick_start.md#4-multi-resolution-training-with-buckets-opensora-v11-and-above). -Then you can launch the dynamic training task following the previous section. An example running script is `scripts/run/run_train_os1.2_stage2.sh`. +The instruction for launching the dynamic training task is smilar to the previous section. An example running script is `scripts/run/run_train_os1.2_stage2.sh`. ### Open-Sora 1.1 diff --git a/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh b/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh index 2f9ade3ebc..13a92392a0 100644 --- a/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh +++ b/examples/opensora_hpcai/scripts/run/run_train_os1.2_stage2.sh @@ -19,7 +19,7 @@ python scripts/train.py \ --mode=0 \ --jit_level O1 \ --max_device_memory 55GB \ ---config configs/opensora-v1-2/train/train_stage2.yaml \ +--config configs/opensora-v1-2/train/train_stage2_ms.yaml \ --csv_path datasets/mixkit-100videos/video_caption_train.csv \ --video_folder datasets/mixkit-100videos/mixkit \ --text_embed_folder datasets/mixkit-100videos/t5_emb_300 \ diff --git a/examples/opensora_hpcai/scripts/test_datasource.py b/examples/opensora_hpcai/scripts/test_datasource.py deleted file mode 100644 index f45ff442a7..0000000000 --- a/examples/opensora_hpcai/scripts/test_datasource.py +++ /dev/null @@ -1,882 +0,0 @@ -""" -STDiT training script -""" -import datetime -import logging -import math -import os -import sys -import time -from typing import Optional, Tuple - -import yaml - -import mindspore as ms -from mindspore import nn -from mindspore._c_expression import reset_op_id -from mindspore.communication.management import get_group_size, get_rank, init -from mindspore.nn.wrap.loss_scale import DynamicLossScaleUpdateCell -from mindspore.train.callback import TimeMonitor - -__dir__ = os.path.dirname(os.path.abspath(__file__)) -mindone_lib_path = os.path.abspath(os.path.join(__dir__, "../../../")) -sys.path.insert(0, mindone_lib_path) -sys.path.insert(0, os.path.abspath(os.path.join(__dir__, ".."))) -from args_train import parse_args -from opensora.datasets.aspect import ASPECT_RATIOS, get_image_size -from opensora.models.layers.operation_selector import set_dynamic_mode -from opensora.models.stdit import STDiT2_XL_2, STDiT3_XL_2, STDiT_XL_2 -from opensora.models.vae.vae import SD_CONFIG, OpenSoraVAE_V1_2, VideoAutoencoderKL -from opensora.pipelines import ( - DiffusionWithLoss, - DiffusionWithLossFiTLike, - RFlowDiffusionWithLoss, - RFlowEvalDiffusionWithLoss, -) -from opensora.schedulers.iddpm import create_diffusion -from opensora.utils.amp import auto_mixed_precision -from opensora.utils.callbacks import EMAEvalSwapCallback, PerfRecorderCallback -from opensora.utils.ema import EMA, save_ema_ckpts -from opensora.utils.metrics import BucketLoss -from opensora.utils.model_utils import WHITELIST_OPS, Model -from opensora.utils.resume import flush_from_cache, get_resume_ckpt, get_resume_states, resume_train_net, save_train_net - -from mindone.trainers.callback import EvalSaveCallback, OverflowMonitor, ProfilerCallbackEpoch, StopAtStepCallback -from mindone.trainers.checkpoint import CheckpointManager -from mindone.trainers.lr_schedule import create_scheduler -from mindone.trainers.optim import create_optimizer -from mindone.trainers.recorder import PerfRecorder -from mindone.trainers.train_step import TrainOneStepWrapper -from mindone.utils.logger import set_logger -from mindone.utils.params import count_params -from mindone.utils.seed import set_random_seed - -os.environ["HCCL_CONNECT_TIMEOUT"] = "6000" -os.environ["MS_ASCEND_CHECK_OVERFLOW_MODE"] = "INFNAN_MODE" - -logger = logging.getLogger(__name__) - - -def init_env( - mode: int = ms.GRAPH_MODE, - seed: int = 42, - distributed: bool = False, - max_device_memory: str = None, - device_target: str = "Ascend", - parallel_mode: str = "data", - jit_level: str = "O0", - global_bf16: bool = False, - dynamic_shape: bool = False, - debug: bool = False, -) -> Tuple[int, int]: - """ - Initialize MindSpore environment. - - Args: - mode: MindSpore execution mode. Default is 0 (ms.GRAPH_MODE). - seed: The seed value for reproducibility. Default is 42. - distributed: Whether to enable distributed training. Default is False. - Returns: - A tuple containing the device ID, rank ID and number of devices. - """ - set_random_seed(seed) - - if debug and mode == ms.GRAPH_MODE: # force PyNative mode when debugging - logger.warning("Debug mode is on, switching execution mode to PyNative.") - mode = ms.PYNATIVE_MODE - - if max_device_memory is not None: - ms.set_context(max_device_memory=max_device_memory) - - if distributed: - ms.set_context( - mode=mode, - device_target=device_target, - ) - if parallel_mode == "optim": - print("use optim parallel") - ms.set_auto_parallel_context( - parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, - enable_parallel_optimizer=True, - ) - init() - device_num = get_group_size() - rank_id = get_rank() - else: - init() - device_num = get_group_size() - rank_id = get_rank() - logger.debug(f"rank_id: {rank_id}, device_num: {device_num}") - ms.reset_auto_parallel_context() - - ms.set_auto_parallel_context( - parallel_mode=ms.ParallelMode.DATA_PARALLEL, - gradients_mean=True, - device_num=device_num, - ) - - var_info = ["device_num", "rank_id", "device_num / 8", "rank_id / 8"] - var_value = [device_num, rank_id, int(device_num / 8), int(rank_id / 8)] - logger.info(dict(zip(var_info, var_value))) - - else: - device_num = 1 - rank_id = 0 - ms.set_context( - mode=mode, - device_target=device_target, - pynative_synchronize=debug, - ) - - try: - if jit_level in ["O0", "O1", "O2"]: - ms.set_context(jit_config={"jit_level": jit_level}) - else: - logger.warning( - f"Unsupported jit_level: {jit_level}. The framework will automatically select the execution mode." - ) - except Exception: - logger.warning( - "The current jit_level is not suitable because current MindSpore version or mode does not match," - "please ensure the MindSpore version >= ms2.3_0615, and use GRAPH_MODE." - ) - - if global_bf16: - ms.set_context(ascend_config={"precision_mode": "allow_mix_precision_bf16"}) - - if dynamic_shape: - logger.info("Dynamic shape mode enabled, repeat_interleave/split/chunk will be called from mint module") - set_dynamic_mode(True) - if mode == 0: - # FIXME: this is a temp fix for dynamic shape training in graph mode. may remove in future version. - # can append adamw fusion flag if use nn.AdamW optimzation for acceleration - ms.set_context(graph_kernel_flags="--disable_packet_ops=Reshape") - - return rank_id, device_num - - -def set_all_reduce_fusion( - params, - split_num: int = 7, - distributed: bool = False, - parallel_mode: str = "data", -) -> None: - """Set allreduce fusion strategy by split_num.""" - - if distributed and parallel_mode == "data": - all_params_num = len(params) - step = all_params_num // split_num - split_list = [i * step for i in range(1, split_num)] - split_list.append(all_params_num - 1) - logger.info(f"Distribute config set: dall_params_num: {all_params_num}, set all_reduce_fusion: {split_list}") - ms.set_auto_parallel_context(all_reduce_fusion_config=split_list) - - -def initialize_dataset( - args, - csv_path, - video_folder, - text_embed_folder, - vae_latent_folder, - batch_size, - img_h, - img_w, - latte_model, - vae, - bucket_config: Optional[dict] = None, - validation: bool = False, - device_num: int = 1, - rank_id: int = 0, -): - if args.model_version == "v1": - from opensora.datasets.t2v_dataset import create_dataloader - - ds_config = dict( - csv_path=csv_path, - video_folder=video_folder, - text_emb_folder=text_embed_folder, - return_text_emb=True, - vae_latent_folder=vae_latent_folder, - return_vae_latent=args.train_with_vae_latent, - vae_scale_factor=args.sd_scale_factor, - sample_size=img_w, # img_w == img_h - sample_stride=args.frame_stride, - sample_n_frames=args.num_frames, - tokenizer=None, - video_column=args.video_column, - caption_column=args.caption_column, - disable_flip=args.disable_flip, - filter_data=args.filter_data, - ) - dataloader = create_dataloader( - ds_config, - batch_size=batch_size, - shuffle=True, - device_num=device_num, - rank_id=rank_id, - num_parallel_workers=args.num_parallel_workers, - max_rowsize=args.max_rowsize, - ) - num_src_samples = batch_size * dataloader.get_dataset_size() * (device_num if device_num is not None else 1) - - datasets = [0] - else: - from opensora.datasets.bucket import Bucket, bucket_split_function - from opensora.datasets.mask_generator import MaskGenerator - from opensora.datasets.video_dataset_refactored import VideoDatasetRefactored, create_dataloader - - # from mindone.data import create_dataloader - if validation: - mask_gen = MaskGenerator({"identity": 1.0}) - all_buckets, individual_buckets = None, [None] - if bucket_config is not None: - all_buckets = Bucket(bucket_config) - # Build a new bucket for each resolution and number of frames for the validation stage - individual_buckets = [ - Bucket({res: {num_frames: [1.0, bucket_config[res][num_frames][1]]}}) - for res in bucket_config.keys() - for num_frames in bucket_config[res].keys() - ] - else: - mask_gen = MaskGenerator(args.mask_ratios) - all_buckets = Bucket(bucket_config) if bucket_config is not None else None - individual_buckets = [all_buckets] - - # output_columns=["video", "caption", "mask", "fps", "num_frames", "frames_mask"], - output_columns = ["video", "caption", "mask", "frames_mask", "num_frames", "height", "width", "fps", "ar"] - if args.pre_patchify: - output_columns.extend(["spatial_pos", "spatial_mask", "temporal_pos", "temporal_mask"]) - - datasets = [ - VideoDatasetRefactored( - csv_path=csv_path, - video_folder=video_folder, - text_emb_folder=text_embed_folder, - vae_latent_folder=vae_latent_folder, - vae_scale_factor=args.sd_scale_factor, - sample_n_frames=args.num_frames, - sample_stride=args.frame_stride, - frames_mask_generator=mask_gen, - t_compress_func=lambda x: vae.get_latent_size((x, None, None))[0], - buckets=buckets, - filter_data=args.filter_data, - pre_patchify=args.pre_patchify, - patch_size=latte_model.patch_size, - embed_dim=latte_model.hidden_size, - num_heads=latte_model.num_heads, - max_target_size=args.max_image_size, - input_sq_size=latte_model.input_sq_size, - in_channels=latte_model.in_channels, - apply_train_transforms=True, - target_size=(img_h, img_w), - video_backend=args.video_backend, - output_columns=output_columns, - ) - for buckets in individual_buckets - ] - - num_src_samples = sum([len(ds) for ds in datasets]) - - dataloaders = [ - create_dataloader( - dataset, - batch_size=batch_size if all_buckets is None else 0, # Turn off batching if using buckets - shuffle=not validation, - device_num=device_num, - rank_id=rank_id, - num_parallel_workers=args.num_parallel_workers, - drop_remainder=not validation, - prefetch_size=args.prefetch_size, - max_rowsize=args.max_rowsize, - debug=args.debug, - ) - for dataset in datasets - ] - dataloader = ms.dataset.ConcatDataset(dataloaders) if len(dataloaders) > 1 else dataloaders[0] - - if all_buckets is not None: - hash_func, bucket_boundaries, bucket_batch_sizes = bucket_split_function(all_buckets) - dataloader = dataloader.bucket_batch_by_length( - ["video"], - bucket_boundaries, - bucket_batch_sizes, - element_length_function=hash_func, - drop_remainder=not validation, - ) - return dataloader, num_src_samples, datasets[0] - - -def main(args): - if args.add_datetime: - time_str = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S") - args.output_path = os.path.join(args.output_path, time_str) - - # 1. init - rank_id, device_num = init_env( - args.mode, - seed=args.seed, - distributed=args.use_parallel, - device_target=args.device_target, - max_device_memory=args.max_device_memory, - parallel_mode=args.parallel_mode, - jit_level=args.jit_level, - global_bf16=args.global_bf16, - dynamic_shape=(args.bucket_config is not None), - debug=args.debug, - ) - set_logger(name="", output_dir=args.output_path, rank=rank_id, log_level=eval(args.log_level)) - - # 2. model initiate and weight loading - dtype_map = {"fp16": ms.float16, "bf16": ms.bfloat16} - - img_h, img_w = None, None - if args.pre_patchify: - img_h, img_w = args.max_image_size, args.max_image_size - elif args.image_size is not None: - img_h, img_w = args.image_size if isinstance(args.image_size, list) else (args.image_size, args.image_size) - elif args.bucket_config is None: - if args.resolution is None or args.aspect_ratio is None: - raise ValueError( - "`resolution` and `aspect_ratio` must be provided if `image_size` or `bucket_config` are not provided" - ) - img_h, img_w = get_image_size(args.resolution, args.aspect_ratio) - - if args.model_version == "v1": - assert img_h == img_w, "OpenSora v1 support square images only." - - # 2.1 vae - logger.info("vae init") - train_with_vae_latent = args.vae_latent_folder is not None and os.path.exists(args.vae_latent_folder) - if not train_with_vae_latent: - if args.vae_type in [None, "VideoAutoencoderKL"]: - vae = VideoAutoencoderKL( - config=SD_CONFIG, ckpt_path=args.vae_checkpoint, micro_batch_size=args.vae_micro_batch_size - ) - elif args.vae_type == "OpenSoraVAE_V1_2": - if args.vae_micro_frame_size != 17: - logger.warning("vae_micro_frame_size should be 17 to align with the vae pretrain setting.") - vae = OpenSoraVAE_V1_2( - micro_batch_size=args.vae_micro_batch_size, - micro_frame_size=args.vae_micro_frame_size, - ckpt_path=args.vae_checkpoint, - freeze_vae_2d=True, - ) - else: - raise ValueError(f"Unknown VAE type: {args.vae_type}") - vae = vae.set_train(False) - - for param in vae.get_parameters(): - param.requires_grad = False - if args.vae_param_dtype in ["fp16", "bf16"]: - # filter out norm - if "norm" not in param.name: - param.set_dtype(dtype_map[args.vae_param_dtype]) - - if args.vae_dtype in ["fp16", "bf16"]: - vae = auto_mixed_precision( - vae, - amp_level=args.vae_amp_level, - dtype=dtype_map[args.vae_dtype], - custom_fp32_cells=[nn.GroupNorm] if args.vae_keep_gn_fp32 else [], - ) - - # infer latent size - VAE_Z_CH = vae.out_channels - latent_size = vae.get_latent_size((args.num_frames, img_h, img_w)) - else: - # vae cache - vae = None - assert args.vae_type != "OpenSoraVAE_V1_2", "vae cache is not supported with 3D VAE currently." - VAE_Z_CH = SD_CONFIG["z_channels"] - VAE_T_COMPRESS = 1 - VAE_S_COMPRESS = 8 - latent_size = (args.num_frames // VAE_T_COMPRESS, img_h // VAE_S_COMPRESS, img_w // VAE_S_COMPRESS) - - # 2.2 stdit - if args.model_version == "v1": - assert img_h == img_w, "OpenSora v1 support square images only." - - patchify_conv3d_replace = "linear" if args.pre_patchify else args.patchify - model_extra_args = dict( - input_size=latent_size, - in_channels=VAE_Z_CH, - model_max_length=args.model_max_length, - patchify_conv3d_replace=patchify_conv3d_replace, # for Ascend - manual_pad=args.manual_pad, - enable_flashattn=args.enable_flash_attention, - use_recompute=args.use_recompute, - num_recompute_blocks=args.num_recompute_blocks, - ) - - if args.pre_patchify and args.model_version != "v1.1": - raise ValueError("`pre_patchify=True` can only be used in model version 1.1.") - - if args.model_version == "v1": - model_name = "STDiT" - model_extra_args.update( - { - "space_scale": args.space_scale, # 0.5 for 256x256. 1. for 512 - "time_scale": args.time_scale, - } - ) - latte_model = STDiT_XL_2(**model_extra_args) - elif args.model_version == "v1.1": - model_name = "STDiT2" - model_extra_args.update({"input_sq_size": 512, "qk_norm": True}) - latte_model = STDiT2_XL_2(**model_extra_args) - elif args.model_version == "v1.2": - model_name = "STDiT3" - model_extra_args["qk_norm"] = True - model_extra_args["freeze_y_embedder"] = args.freeze_y_embedder - latte_model = STDiT3_XL_2(**model_extra_args) - else: - raise ValueError(f"Unknown model version: {args.model_version}") - logger.info(f"{model_name} input size: {latent_size if args.bucket_config is None else 'Variable'}") - - # mixed precision - if args.dtype in ["fp16", "bf16"]: - if not args.global_bf16: - latte_model = auto_mixed_precision( - latte_model, - amp_level=args.amp_level, - dtype=dtype_map[args.dtype], - custom_fp32_cells=WHITELIST_OPS, - ) - # load checkpoint - if len(args.pretrained_model_path) > 0: - logger.info(f"Loading ckpt {args.pretrained_model_path}...") - latte_model.load_from_checkpoint(args.pretrained_model_path) - else: - logger.info("Use random initialization for Latte") - latte_model.set_train(True) - - if (latent_size[1] and latent_size[1] % latte_model.patch_size[1]) or ( - latent_size[2] and latent_size[2] % latte_model.patch_size[2] - ): - height_ = latte_model.patch_size[1] * 8 # FIXME - width_ = latte_model.patch_size[2] * 8 # FIXME - msg = f"Image height ({img_h}) and width ({img_w}) should be divisible by {height_} and {width_} respectively." - if patchify_conv3d_replace == "linear": - raise ValueError(msg) - else: - logger.warning(msg) - - # 2.3 ldm with loss - logger.info(f"Train with vae latent cache: {train_with_vae_latent}") - diffusion = create_diffusion(timestep_respacing="") - latent_diffusion_eval, metrics = None, {} - pipeline_kwargs = dict( - scale_factor=args.sd_scale_factor, - cond_stage_trainable=False, - text_emb_cached=True, - video_emb_cached=train_with_vae_latent, - ) - if args.noise_scheduler.lower() == "ddpm": - if args.validate: - logger.warning( - "Validation is supported with Rectified Flow noise scheduler only. No validation will be performed." - ) - if args.pre_patchify: - additional_pipeline_kwargs = dict( - patch_size=latte_model.patch_size, - max_image_size=args.max_image_size, - vae_downsample_rate=8.0, - in_channels=latte_model.in_channels, - ) - pipeline_kwargs.update(additional_pipeline_kwargs) - pipeline_ = DiffusionWithLossFiTLike - else: - pipeline_ = DiffusionWithLoss - elif args.noise_scheduler.lower() == "rflow": - if args.validate: - if args.val_bucket_config is None: - metrics = {"Validation loss": BucketLoss(str((img_h, img_w)), {(img_h, img_w)}, args.num_frames)} - else: - metrics = { - f"Validation loss {res}x{frames}": BucketLoss(res, set(ASPECT_RATIOS[res][1].values()), frames) - for res, val in args.val_bucket_config.items() - for frames in val.keys() - } - latent_diffusion_eval = RFlowEvalDiffusionWithLoss( - latte_model, - diffusion, - num_eval_timesteps=args.num_eval_timesteps, - vae=vae, - text_encoder=None, - **pipeline_kwargs, - ) - pipeline_kwargs.update( - dict(sample_method=args.sample_method, use_timestep_transform=args.use_timestep_transform) - ) - pipeline_ = RFlowDiffusionWithLoss - else: - raise ValueError(f"Unknown noise scheduler: {args.noise_scheduler}") - - latent_diffusion_with_loss = pipeline_(latte_model, diffusion, vae=vae, text_encoder=None, **pipeline_kwargs) - - # 3. create dataset - dataloader, num_src_samples, src_dataset = initialize_dataset( - args, - args.csv_path, - args.video_folder, - args.text_embed_folder, - args.vae_latent_folder, - args.batch_size, - img_h, - img_w, - latte_model, - vae, - bucket_config=args.bucket_config, - device_num=device_num, - rank_id=rank_id, - ) - - # FIXME: get_dataset_size() is extremely slow when used with bucket_batch_by_length - if args.bucket_config is None: - dataset_size = dataloader.get_dataset_size() - else: - # steps per epoch is not constant in bucket config training - # FIXME: It is a highly relaxed estimation to ensure enough steps per epoch to sustain training. \ - # A more precise estimation or run-time infer is to be implemented. - dataset_size = math.ceil(num_src_samples / device_num) - dataloader.dataset_size = dataset_size - logger.warning( - f"Manually set dataset_size to {dataset_size} to skip get_dataset_size() for bucket config training." - ) - - val_dataloader = None - if args.validate: - val_dataloader = initialize_dataset( - args, - args.val_csv_path, - args.val_video_folder, - args.val_text_embed_folder, - args.val_vae_latent_folder, - args.val_batch_size, - img_h, - img_w, - latte_model, - vae, - bucket_config=args.val_bucket_config, - validation=True, - device_num=device_num, - rank_id=rank_id, - ) - - # compute total steps and data epochs (in unit of data sink size) - if args.dataset_sink_mode and args.sink_size != -1: - steps_per_sink = args.sink_size - else: - steps_per_sink = dataset_size - - if args.train_steps == -1: - assert args.epochs != -1 - total_train_steps = args.epochs * dataset_size - sink_epochs = math.ceil(total_train_steps / steps_per_sink) - else: - total_train_steps = args.train_steps - # asume one step need one whole epoch data to ensure enough batch loading for training - sink_epochs = total_train_steps - - if args.ckpt_save_steps == -1: - ckpt_save_interval = args.ckpt_save_interval - step_mode = False - else: - step_mode = not args.dataset_sink_mode - if not args.dataset_sink_mode: - ckpt_save_interval = args.ckpt_save_steps - else: - # still need to count interval in sink epochs - ckpt_save_interval = max(1, args.ckpt_save_steps // steps_per_sink) - if args.ckpt_save_steps % steps_per_sink != 0: - logger.warning( - f"`ckpt_save_steps` must be times of sink size or dataset_size under dataset sink mode." - f"Checkpoint will be saved every {ckpt_save_interval * steps_per_sink} steps." - ) - step_mode = step_mode if args.step_mode is None else args.step_mode - - logger.info(f"train_steps: {total_train_steps}, train_epochs: {args.epochs}, sink_size: {args.sink_size}") - logger.info(f"total train steps: {total_train_steps}, sink epochs: {sink_epochs}") - logger.info( - "ckpt_save_interval: {} {}".format( - ckpt_save_interval, "steps" if (not args.dataset_sink_mode and step_mode) else "sink epochs" - ) - ) - - # 4. build training utils: lr, optim, callbacks, trainer - # build learning rate scheduler - if not args.decay_steps: - args.decay_steps = total_train_steps - args.warmup_steps # fix lr scheduling - if args.decay_steps <= 0: - logger.warning( - f"decay_steps is {args.decay_steps}, please check epochs, dataset_size and warmup_steps. " - f"Will force decay_steps to be set to 1." - ) - args.decay_steps = 1 - - lr = create_scheduler( - steps_per_epoch=dataset_size, # not used - name=args.scheduler, - lr=args.start_learning_rate, - end_lr=args.end_learning_rate, - warmup_steps=args.warmup_steps, - decay_steps=args.decay_steps, - total_steps=total_train_steps, - ) - - set_all_reduce_fusion( - latent_diffusion_with_loss.trainable_params(), - split_num=7, - distributed=args.use_parallel, - parallel_mode=args.parallel_mode, - ) - - # build optimizer - optimizer = create_optimizer( - latent_diffusion_with_loss.trainable_params(), - name=args.optim, - betas=args.betas, - eps=args.optim_eps, - group_strategy=args.group_strategy, - weight_decay=args.weight_decay, - lr=lr, - ) - - if args.loss_scaler_type == "dynamic": - loss_scaler = DynamicLossScaleUpdateCell( - loss_scale_value=args.init_loss_scale, scale_factor=args.loss_scale_factor, scale_window=args.scale_window - ) - elif args.loss_scaler_type == "static": - loss_scaler = nn.FixedLossScaleUpdateCell(args.init_loss_scale) - else: - raise ValueError - - # resume ckpt - ckpt_dir = os.path.join(args.output_path, "ckpt") - start_epoch = 0 - cur_iter = 0 - if args.resume: - resume_ckpt = get_resume_ckpt(args.resume, args.output_path) - if resume_ckpt is not None: - start_epoch, cur_iter, loss_scale = get_resume_states(resume_ckpt) - loss_scaler.loss_scale_value = loss_scale - logger.info(f"Resumed loss_scaler, prev epoch: {start_epoch}, global step {cur_iter}") - - # trainer (standalone and distributed) - ema = EMA(latent_diffusion_with_loss.network, ema_decay=args.ema_decay, offloading=True) if args.use_ema else None - - net_with_grads = TrainOneStepWrapper( - latent_diffusion_with_loss, - optimizer=optimizer, - scale_sense=loss_scaler, - drop_overflow_update=args.drop_overflow_update, - gradient_accumulation_steps=args.gradient_accumulation_steps, - clip_grad=args.clip_grad, - clip_norm=args.max_grad_norm, - ema=ema, - ) - - # resume train net states - if args.resume and resume_ckpt is not None: - resume_train_net(net_with_grads, resume_ckpt) - - if (args.mode == 0) and (args.bucket_config is not None): - video = ms.Tensor(shape=[None, None, 3, None, None], dtype=ms.float32) - caption = ms.Tensor(shape=[None, args.model_max_length, 4096], dtype=ms.float32) - mask = ms.Tensor(shape=[None, args.model_max_length], dtype=ms.uint8) - frames_mask = ms.Tensor(shape=[None, None], dtype=ms.bool_) - # fmt: off - num_frames = ms.Tensor(shape=[None, ], dtype=ms.float32) - height = ms.Tensor(shape=[None, ], dtype=ms.float32) - width = ms.Tensor(shape=[None, ], dtype=ms.float32) - fps = ms.Tensor(shape=[None, ], dtype=ms.float32) - ar = ms.Tensor(shape=[None, ], dtype=ms.float32) - # fmt: on - net_with_grads.set_inputs(video, caption, mask, frames_mask, num_frames, height, width, fps, ar) - logger.info("Dynamic inputs are initialized for bucket config training in Graph mode!") - - if not args.custom_train: - if args.global_bf16: - model = Model(net_with_grads, eval_network=latent_diffusion_eval, metrics=metrics, amp_level="O0") - else: - model = Model(net_with_grads, eval_network=latent_diffusion_eval, metrics=metrics) - - # callbacks - callbacks = [OverflowMonitor(), EMAEvalSwapCallback(ema)] - if args.bucket_config is None: - callbacks.append(TimeMonitor(args.log_interval)) - else: - logger.info( - "As steps per epoch are inaccurate with bucket config, TimeMonitor is disabled. See result.log for the actual step time" - ) - if rank_id == 0: - save_cb = EvalSaveCallback( - network=latent_diffusion_with_loss.network, - rank_id=rank_id, - ckpt_save_dir=ckpt_dir, - ema=ema, - save_ema_only=False, - ckpt_save_policy="latest_k", - ckpt_max_keep=args.ckpt_max_keep, - step_mode=step_mode, - use_step_unit=(args.ckpt_save_steps != -1), - ckpt_save_interval=ckpt_save_interval, - log_interval=args.log_interval, - start_epoch=start_epoch, - model_name=model_name, - resume_prefix_blacklist=["vae.", "swap."], - record_lr=False, - train_steps=args.train_steps, - ) - rec_cb = PerfRecorderCallback( - save_dir=args.output_path, - file_name="result_val.log", - metric_names=list(metrics.keys()), - resume=args.resume, - ) - callbacks.extend([save_cb, rec_cb]) - if args.profile: - callbacks.append(ProfilerCallbackEpoch(2, 3, "./profile_data")) - - if args.train_steps > 0: - callbacks.append(StopAtStepCallback(args.train_steps, global_step=cur_iter)) - - # 5. log and save config - if rank_id == 0: - if vae is not None: - num_params_vae, num_params_vae_trainable = count_params(vae) - else: - num_params_vae, num_params_vae_trainable = 0, 0 - num_params_latte, num_params_latte_trainable = count_params(latte_model) - num_params = num_params_vae + num_params_latte - num_params_trainable = num_params_vae_trainable + num_params_latte_trainable - key_info = "Key Settings:\n" + "=" * 50 + "\n" - key_info += "\n".join( - [ - f"MindSpore mode[GRAPH(0)/PYNATIVE(1)]: {args.mode}", - f"Jit level: {args.jit_level}", - f"Distributed mode: {args.use_parallel}", - f"Num params: {num_params:,} (latte: {num_params_latte:,}, vae: {num_params_vae:,})", - f"Num trainable params: {num_params_trainable:,}", - f"{model_name} dtype: {args.dtype}", - f"VAE dtype: {args.vae_dtype}", - f"Learning rate: {args.start_learning_rate}", - f"Batch size: {args.batch_size if args.bucket_config is None else 'Variable'}", - f"Image size: {(img_h, img_w) if args.bucket_config is None else 'Variable'}", - f"Frames: {args.num_frames if args.bucket_config is None else 'Variable'}", - f"Latent size: {latent_size if args.bucket_config is None else 'Variable'}", - f"Weight decay: {args.weight_decay}", - f"Grad accumulation steps: {args.gradient_accumulation_steps}", - f"Num epochs: {args.epochs}", - f"Loss scaler: {args.loss_scaler_type}", - f"Init loss scale: {args.init_loss_scale}", - f"Grad clipping: {args.clip_grad}", - f"Max grad norm: {args.max_grad_norm}", - f"EMA: {args.use_ema}", - f"Enable flash attention: {args.enable_flash_attention}", - f"Use recompute: {args.use_recompute}", - f"Dataset sink: {args.dataset_sink_mode}", - f"Resume training: {args.resume}", - ] - ) - key_info += "\n" + "=" * 50 - logger.info(key_info) - - logger.info("Start training...") - - with open(os.path.join(args.output_path, "args.yaml"), "w") as f: - yaml.safe_dump(vars(args), stream=f, default_flow_style=False, sort_keys=False) - - # 6. train - if not args.custom_train: - model.fit( - sink_epochs, - dataloader, - valid_dataset=val_dataloader, - valid_frequency=args.val_interval, - callbacks=callbacks, - dataset_sink_mode=args.dataset_sink_mode, - valid_dataset_sink_mode=False, # TODO: add support? - sink_size=args.sink_size, - initial_epoch=start_epoch, - ) - - else: - assert not args.dataset_sink_mode, "data sink not supported for custom train process currently" - - # re-count training steps and epochs - if args.train_steps > 0: - # ensure num_epochs >= train_steps/steps_per_epoch, but steps_per_epoch is uncertain with dynamic BS, the safest bound is to assume it to be 1. - # Note that it's not the actual data epochs that will be run. Training process will terminate in train_steps - num_epochs = args.train_steps - else: - assert args.epochs > 0, "args.epochs must be given and > 0 if train_steps is not specified" - # the actual data epochs to be run in this case - num_epochs = args.epochs - global_step = cur_iter # index start from 1 (after first-step network update) - - if args.ckpt_save_steps > 0: - save_by_step = True - else: - save_by_step = False - - if rank_id == 0: - ckpt_manager = CheckpointManager(ckpt_dir, "latest_k", k=args.ckpt_max_keep) - if not os.path.exists(ckpt_dir): - os.makedirs(ckpt_dir) - perf_columns = ["step", "loss", "train_time(s)", "shape"] - output_dir = ckpt_dir.replace("/ckpt", "") - if start_epoch == 0: - record = PerfRecorder(output_dir, metric_names=perf_columns) - else: - record = PerfRecorder(output_dir, resume=True) - - # ds_iter = dataloader.create_tuple_iterator(num_epochs=num_epochs - start_epoch) - # ds_iter = dataloader.create_tuple_iterator(num_epochs=-1) # infinite - end_train = False - # import pdb; pdb.set_trace() - print(f"D--: start_epoch {start_epoch}, num_epochs {num_epochs}") - for epoch in range(start_epoch + 1, num_epochs + 1): - if (args.train_steps > 0) and (global_step >= args.train_steps): - logger.warning("resumed steps >= train_steps, will end training") - break - - start_time_s = time.time() - samples_per_epoch = num_src_samples - - for step in range(samples_per_epoch): - data = src_dataset.__getitem__(step)[0] - - # loss, overflow, scaling_sens = net_with_grads(*data) - global_step += 1 - step_time = time.time() - start_time_s - - # log - print(data[0].shape) - loss_val = 0 # float(loss.asnumpy()) - logger.info( - f"Epoch {epoch}, Step {step}, loss {loss_val:.5f}, Global step {global_step}, Shape: {tuple(data[0].shape)}, Step time {step_time*1000:.2f}ms" - ) - - if (args.train_steps > 0) and (global_step >= args.train_steps): - end_train = True - break - - start_time_s = time.time() - - # dataloader.reset() - # flush_from_cache(net_with_grads) - - if end_train: - break - - logger.info("Finished training. Ending process...") - reset_op_id() - time.sleep(60) - logger.info("End") - - -if __name__ == "__main__": - logger.debug("process id:", os.getpid()) - args = parse_args() - main(args) diff --git a/examples/opensora_hpcai/scripts/train.py b/examples/opensora_hpcai/scripts/train.py index 347541ba43..d749b8f6e9 100644 --- a/examples/opensora_hpcai/scripts/train.py +++ b/examples/opensora_hpcai/scripts/train.py @@ -148,9 +148,9 @@ def init_env( logger.info("Dynamic shape mode enabled, repeat_interleave/split/chunk will be called from mint module") set_dynamic_mode(True) # if mode == 0: - # FIXME: this is a temp fix for dynamic shape training in graph mode. may remove in future version. - # can append adamw fusion flag if use nn.AdamW optimzation for acceleration - # ms.set_context(graph_kernel_flags="--disable_packet_ops=Reshape") + # FIXME: this is a temp fix for dynamic shape training in graph mode. may remove in future version. + # can append adamw fusion flag if use nn.AdamW optimzation for acceleration + # ms.set_context(graph_kernel_flags="--disable_packet_ops=Reshape") return rank_id, device_num @@ -848,7 +848,8 @@ def main(args): # print(data[0].shape) loss_val = float(loss.asnumpy()) logger.info( - f"Epoch {epoch}, Step {step}, loss {loss_val:.5f}, Global step {global_step}, Shape: {tuple(data[0].shape)}, Step time {step_time*1000:.2f}ms" + f"Epoch {epoch}, Step {step}, loss {loss_val:.5f}, Global step {global_step}," + + " Shape: {tuple(data[0].shape)}, Step time {step_time*1000:.2f}ms" ) if overflow: logger.warning("overflow detected") diff --git a/examples/opensora_hpcai/tools/analyze_buckt_result.py b/examples/opensora_hpcai/tools/analyze_buckt_result.py index d5f144ed6f..b707de789f 100644 --- a/examples/opensora_hpcai/tools/analyze_buckt_result.py +++ b/examples/opensora_hpcai/tools/analyze_buckt_result.py @@ -1,40 +1,40 @@ -import pandas as pd import argparse +import pandas as pd + # result_path = 'outputs/analyze_os1.2_stage2_vcg200_ms231/merged_result.log' + def analyze(result_path, save_path): warmup_steps = 50 - max_step_time = 100 # step time larger than this value will be dropped, considering checkpoint saving + max_step_time = 100 # step time larger than this value will be dropped, considering checkpoint saving - data = pd.read_csv(result_path, sep='\t') + data = pd.read_csv(result_path, sep="\t") # filter warmup stage - data = data.iloc[warmup_steps-1:] + data = data.iloc[warmup_steps - 1 :] # filter out outliers - data = data[data['train_time(s)'] Date: Fri, 13 Sep 2024 15:36:06 +0800 Subject: [PATCH 27/32] impr bucket analysis --- ...ckt_result.py => analyze_bucket_result.py} | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) rename examples/opensora_hpcai/tools/{analyze_buckt_result.py => analyze_bucket_result.py} (55%) diff --git a/examples/opensora_hpcai/tools/analyze_buckt_result.py b/examples/opensora_hpcai/tools/analyze_bucket_result.py similarity index 55% rename from examples/opensora_hpcai/tools/analyze_buckt_result.py rename to examples/opensora_hpcai/tools/analyze_bucket_result.py index d5f144ed6f..807ef4fcdd 100644 --- a/examples/opensora_hpcai/tools/analyze_buckt_result.py +++ b/examples/opensora_hpcai/tools/analyze_bucket_result.py @@ -1,5 +1,6 @@ import pandas as pd import argparse +import math # result_path = 'outputs/analyze_os1.2_stage2_vcg200_ms231/merged_result.log' @@ -14,7 +15,8 @@ def analyze(result_path, save_path): # filter out outliers data = data[data['train_time(s)'] Date: Tue, 17 Sep 2024 16:36:25 +0800 Subject: [PATCH 28/32] add stage3 balanced bucket --- .../opensora-v1-2/train/train_stage3_ms.yaml | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage3_ms.yaml diff --git a/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage3_ms.yaml b/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage3_ms.yaml new file mode 100644 index 0000000000..65708812e0 --- /dev/null +++ b/examples/opensora_hpcai/configs/opensora-v1-2/train/train_stage3_ms.yaml @@ -0,0 +1,83 @@ +# model +model_version: v1.2 +pretrained_model_path: PATH_TO_YOUR_MODEL +model_max_length: 300 +freeze_y_embedder: True + +noise_scheduler: rflow +sample_method: logit-normal +use_timestep_transform: True + +vae_type: OpenSoraVAE_V1_2 +vae_checkpoint: models/OpenSora-VAE-v1.2/model.ckpt +vae_dtype: bf16 +vae_micro_batch_size: 4 +vae_micro_frame_size: 17 # keep it unchanged for the best results + +enable_flash_attention: True +use_recompute: True + +# data +num_parallel_workers: 2 +num_workers_dataset: 2 +prefetch_size: 2 +max_rowsize: 256 + +# precision +amp_level: "O2" +dtype: bf16 +loss_scaler_type: static +init_loss_scale: 1 + +# mindspore params, refer to https://www.mindspore.cn/docs/zh-CN/r2.3.1/api_python/mindspore/mindspore.set_context.html +max_device_memory: "59GB" +jit_level: "O1" +manual_pad: True + +# training hyper-params +scheduler: "constant" +start_learning_rate: 1.e-4 +end_learning_rate: 1.e-4 +warmup_steps: 1000 + +clip_grad: True +max_grad_norm: 1.0 +use_ema: True +# ema_decay: 0.99 + +optim: "adamw_re" +optim_eps: 1e-15 +weight_decay: 0. + +# epochs: 15 +train_steps: 15000 +ckpt_save_steps: 500 + +mask_ratios: + random: 0.01 + interpolate: 0.002 + quarter_random: 0.002 + quarter_head: 0.002 + quarter_tail: 0.002 + quarter_head_tail: 0.002 + image_random: 0.0 + image_head: 0.22 + image_tail: 0.005 + image_head_tail: 0.005 + +bucket_config: + # Structure: "resolution": { num_frames: [ keep_prob, batch_size ] } + # Setting [ keep_prob, batch_size ] to [ 0.0, 0 ] forces longer videos into smaller resolution buckets + "144p": {1: [1.0, 475], 51: [1.0, 51], 102: [1.0, 27], 204: [1.0, 13], 408: [1.0, 6]} + "256": {1: [1.0, 297], 51: [0.5, 20], 102: [0.5, 10], 204: [0.5, 6], 408: [[0.5, 0.5], 2]} + "240p": {1: [1.0, 297], 51: [0.5, 20], 102: [0.5, 10], 204: [0.5, 5], 408: [[0.5, 0.4], 2]} + "360p": {1: [1.0, 141], 51: [0.5, 8], 102: [0.5, 4], 204: [0.5, 2], 408: [[0.5, 0.3], 1]} + "512": {1: [1.0, 141], 51: [0.5, 8], 102: [0.5, 4], 204: [0.5, 2], 408: [[0.5, 0.2], 1]} + "480p": {1: [1.0, 89], 51: [0.5, 5], 102: [0.5, 2], 204: [[0.5, 0.5], 1], 408: [0.0, 0]} + "720p": {1: [0.3, 36], 51: [0.2, 2], 102: [0.1, 1], 204: [0.0, 0]} + "1024": {1: [0.3, 36], 51: [0.1, 2], 102: [0.1, 1], 204: [0.0, 0]} + "1080p": {1: [0.1, 5]} + "2048": {1: [0.05, 5]} + +# ---------- Validation ---------- +validate: False From f9c1c654658932cc89dc9afd24013ab5cb6b561d Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Thu, 19 Sep 2024 10:39:43 +0800 Subject: [PATCH 29/32] fix lint --- .../tools/analyze_bucket_result.py | 26 ++++----- .../tools/annotate_stdit_mix100.py | 55 +++++++++++++++++++ 2 files changed, 68 insertions(+), 13 deletions(-) create mode 100644 examples/opensora_hpcai/tools/annotate_stdit_mix100.py diff --git a/examples/opensora_hpcai/tools/analyze_bucket_result.py b/examples/opensora_hpcai/tools/analyze_bucket_result.py index 0a32b53bc4..11e02eb1d9 100644 --- a/examples/opensora_hpcai/tools/analyze_bucket_result.py +++ b/examples/opensora_hpcai/tools/analyze_bucket_result.py @@ -16,8 +16,8 @@ def analyze(result_path, save_path): data = data.iloc[warmup_steps - 1 :] # filter out outliers - data = data[data['train_time(s)'] Date: Thu, 19 Sep 2024 10:42:50 +0800 Subject: [PATCH 30/32] fix linting --- .../tools/annotate_stdit_mix100.py | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/examples/opensora_hpcai/tools/annotate_stdit_mix100.py b/examples/opensora_hpcai/tools/annotate_stdit_mix100.py index a6ffb17d6d..610700ad7e 100644 --- a/examples/opensora_hpcai/tools/annotate_stdit_mix100.py +++ b/examples/opensora_hpcai/tools/annotate_stdit_mix100.py @@ -1,15 +1,15 @@ import glob -import os import json -import random import math +import os +import random -root_dir = '/home_host/ddd/workspace/datasets/mixkit-100videos/mixkit/' -annot_fp = '/home_host/ddd/workspace/datasets/mixkit-100videos/anno_jsons/video_mixkit_65f_54735.json' -output_csv = 'video_caption.csv' +root_dir = "/home_host/ddd/workspace/datasets/mixkit-100videos/mixkit/" +annot_fp = "/home_host/ddd/workspace/datasets/mixkit-100videos/anno_jsons/video_mixkit_65f_54735.json" +output_csv = "video_caption.csv" # read paths -video_fps = sorted(glob.glob(os.path.join(root_dir, '*/*.mp4'))) +video_fps = sorted(glob.glob(os.path.join(root_dir, "*/*.mp4"))) # remove header video_fps = [fp.replace(root_dir, "") for fp in video_fps] print(video_fps) @@ -19,24 +19,24 @@ matched_videos = [] matched_captions = [] -with open(annot_fp, 'r') as fp: +with open(annot_fp, "r") as fp: annot_list = json.load(fp) for i, annot in enumerate(annot_list): video_path = annot["path"] if video_path in video_fps and video_path not in matched_videos: - caption = annot['cap'] - fp_out.write("{},\"{}\"\n".format(video_path, caption)) + caption = annot["cap"] + fp_out.write('{},"{}"\n'.format(video_path, caption)) matched_videos.append(video_path) matched_captions.append(caption) fp_out.close() -print('Num samples', len(matched_videos)) -print('csv saved in ', output_csv) +print("Num samples", len(matched_videos)) +print("csv saved in ", output_csv) # split into train and test train_ratio = 0.8 num_samples = len(matched_videos) -num_train = math.ceil(num_samples * train_ratio) +num_train = math.ceil(num_samples * train_ratio) num_test = num_samples - num_train vc_list = [(matched_videos[i], matched_captions[i]) for i in range(num_samples)] random.shuffle(vc_list) @@ -44,12 +44,13 @@ train_set = sorted(vc_list[:num_train]) test_set = sorted(vc_list[num_train:]) + def write_csv(vcl, save_path): with open(save_path, "w") as fp: fp.write("video,caption\n") for vc in vcl: - fp.write("{},\"{}\"\n".format(vc[0], vc[1])) + fp.write('{},"{}"\n'.format(vc[0], vc[1])) + write_csv(train_set, output_csv.replace(".csv", "_train.csv")) write_csv(test_set, output_csv.replace(".csv", "_test.csv")) - From 067bb3d24720e3be8feb1dd545e259451fb9a052 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Fri, 20 Sep 2024 11:42:42 +0800 Subject: [PATCH 31/32] Update README.md --- examples/opensora_hpcai/README.md | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/examples/opensora_hpcai/README.md b/examples/opensora_hpcai/README.md index 67f5f62776..6292a3acce 100644 --- a/examples/opensora_hpcai/README.md +++ b/examples/opensora_hpcai/README.md @@ -159,9 +159,18 @@ Other useful documents and links are listed below. ## Installation 1. Install MindSpore according to the [official instructions](https://www.mindspore.cn/install). - For Ascend devices, please install **CANN driver C18 (0705)** from [here](https://repo.mindspore.cn/ascend/ascend910/20240705/) and install **MindSpore 2.3** from [here](https://www.mindspore.cn/install). + For Ascend devices, please install [CANN8.0.RC2.beta1](https://www.hiascend.com/developer/download/community/result?module=cann&cann=8.0.RC2.beta1) and install [MindSpore 2.3.1](https://www.mindspore.cn/install). + > To reduce compilation time and training time, you may install MindSpore2.4-20240904 from [here](https://repo.mindspore.cn/mindspore/mindspore/version/202409/20240904/master_20240904010023_67b5df247045f509c4ca2169bac6a551291a3111_newest/unified/aarch64/) -2. Install requirements + You may check your versions by running the following commands. The default installation path of CANN is usually `/usr/local/Ascend/ascend-toolkit` unless you specify a custom one. + + ```bash + cat /usr/local/Ascend/ascend-toolkit/latest/version.cfg + + python -c "import mindspore;mindspore.set_context(device_target='Ascend');mindspore.run_check()" + ``` + +3. Install requirements ```bash pip install -r requirements.txt ``` @@ -624,11 +633,13 @@ Here ✅ means that the data is seen during training, and 🆗 means although no We evaluate the training performance of Open-Sora v1.2 on the MixKit dataset with high-resolution videos (1080P, duration 12s to 100s). The results are as follows. -| Model | Context | jit_level | Precision | BS | NPUs | Size (TxHxW) | Train T. (s/step) | -|:------------|:-------------|:--------|:---------:|:--:|:----:|:----------------------:|:-----------------:| -| STDiT3-XL/2 | D910\*-[C18](https://repo.mindspore.cn/ascend/ascend910/20240705/)-[MS2.3](https://www.mindspore.cn/install) | O1 | BF16 | 1 | 8 | 51x720x1280 | **14.60** | -| STDiT3-XL/2 | D910\*-[C18](https://repo.mindspore.cn/ascend/ascend910/20240705/)-[MS2.3.1(0726)](https://repo.mindspore.cn/mindspore/mindspore/version/202407/20240726/master_20240726220021_4c913fb116c83b9ad28666538483264da8aebe8c_newest/unified/) | O1 | BF16 | 1 | 8 | Stage 2 Dyn. | **33.10** | -| STDiT3-XL/2 | D910\*-[C18](https://repo.mindspore.cn/ascend/ascend910/20240705/)-[MS2.3.1(0726)](https://repo.mindspore.cn/mindspore/mindspore/version/202407/20240726/master_20240726220021_4c913fb116c83b9ad28666538483264da8aebe8c_newest/unified/) | O1 | BF16 | 1 | 8 | Stage 3 Dyn. | **37.7** | +| Model | Context | jit_level | Precision | BS | NPUs | Size (TxHxW) | Train T. (s/step) | config | +|:------------|:-------------|:--------|:---------:|:--:|:----:|:----------------------:|:-----------------:|:-----------------:| +| STDiT3-XL/2 | D910\*-[C18](https://repo.mindspore.cn/ascend/ascend910/20240705/)-[MS2.3](https://www.mindspore.cn/install) | O1 | BF16 | 1 | 8 | 51x720x1280 | **14.60** | [yaml](configs/opensora-v1-2/train/train_720x1280x51.yaml) | +| STDiT3-XL/2 | D910\*-[C18](https://repo.mindspore.cn/ascend/ascend910/20240705/)-[MS2.3.1(0726)](https://repo.mindspore.cn/mindspore/mindspore/version/202407/20240726/master_20240726220021_4c913fb116c83b9ad28666538483264da8aebe8c_newest/unified/) | O1 | BF16 | 1 | 8 | Stage 2 Dyn. | **33.10** | [yaml](configs/opensora-v1-2/train/train_stage2.yaml) | +| STDiT3-XL/2 | D910\*-[C18](https://repo.mindspore.cn/ascend/ascend910/20240705/)-[MS2.3.1(0726)](https://repo.mindspore.cn/mindspore/mindspore/version/202407/20240726/master_20240726220021_4c913fb116c83b9ad28666538483264da8aebe8c_newest/unified/) | O1 | BF16 | 1 | 8 | Stage 3 Dyn. | **34** | [yaml](configs/opensora-v1-2/train/train_stage3.yaml) | + + > Context: {G:GPU, D:Ascend}{chip type}-{CANN version}-{mindspore version}; "Dyn." is short for dynamic shape. Note that the step time of dynamic training can be influenced by the resolution and duration distribution of the source videos. Training performance is under optimization. From 5c5e382663325f170e28621ace3c3a8ea0c601e5 Mon Sep 17 00:00:00 2001 From: Samit <285365963@qq.com> Date: Fri, 20 Sep 2024 16:13:24 +0800 Subject: [PATCH 32/32] add comments --- examples/opensora_hpcai/README.md | 2 +- .../opensora/models/layers/blocks.py | 7 +------ .../opensora/models/vae/modules.py | 14 ------------- .../opensora/schedulers/rectified_flow.py | 2 -- examples/opensora_hpcai/scripts/train.py | 20 +++++++++---------- 5 files changed, 11 insertions(+), 34 deletions(-) diff --git a/examples/opensora_hpcai/README.md b/examples/opensora_hpcai/README.md index 6292a3acce..8055800671 100644 --- a/examples/opensora_hpcai/README.md +++ b/examples/opensora_hpcai/README.md @@ -166,7 +166,7 @@ Other useful documents and links are listed below. ```bash cat /usr/local/Ascend/ascend-toolkit/latest/version.cfg - + python -c "import mindspore;mindspore.set_context(device_target='Ascend');mindspore.run_check()" ``` diff --git a/examples/opensora_hpcai/opensora/models/layers/blocks.py b/examples/opensora_hpcai/opensora/models/layers/blocks.py index c4b17422c8..725f9e0bb5 100644 --- a/examples/opensora_hpcai/opensora/models/layers/blocks.py +++ b/examples/opensora_hpcai/opensora/models/layers/blocks.py @@ -26,9 +26,6 @@ def __init__(self, hidden_size, eps=1e-6): self.variance_epsilon = eps def construct(self, hidden_states: Tensor): - # variance = hidden_states.pow(2).mean(-1, keep_dims=True) - # hidden_states = hidden_states * ops.rsqrt(variance + self.variance_epsilon) - # return self.gamma * hidden_states return ops.rms_norm(hidden_states, self.gamma, self.variance_epsilon)[0] @@ -326,12 +323,10 @@ def __init__(self, normalized_shape, eps=1e-5, elementwise_affine: bool = True, else: self.gamma = ops.ones(normalized_shape, dtype=dtype) self.beta = ops.zeros(normalized_shape, dtype=dtype) - # self.layer_norm = ops.LayerNorm(-1, -1, epsilon=eps) def construct(self, x: Tensor): - # x, _, _ = self.layer_norm(x, self.gamma, self.beta) - normalized_shape = x.shape[-1:] + # mint layer_norm fuses the operations in layer normorlization and it's faster than ops.LayerNorm x = mint.nn.functional.layer_norm(x, normalized_shape, self.gamma, self.beta, self.eps) return x diff --git a/examples/opensora_hpcai/opensora/models/vae/modules.py b/examples/opensora_hpcai/opensora/models/vae/modules.py index 83703faef6..b98a7ca45a 100644 --- a/examples/opensora_hpcai/opensora/models/vae/modules.py +++ b/examples/opensora_hpcai/opensora/models/vae/modules.py @@ -276,20 +276,6 @@ def construct(self, x): temb = None # downsampling - """ - hs = [self.conv_in(x)] - for i_level in range(self.num_resolutions): - for i_block in range(self.num_res_blocks): - h = self.down[i_level].block[i_block](hs[-1], temb) - if len(self.down[i_level].attn) > 0: - h = self.down[i_level].attn[i_block](h) - hs.append(h) - if i_level != self.num_resolutions - 1: - hs.append(self.down[i_level].downsample(hs[-1])) - - # middle - h = hs[-1] - """ hs = self.conv_in(x) for i_level in range(self.num_resolutions): for i_block in range(self.num_res_blocks): diff --git a/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py b/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py index 0464dd5387..0be394ca92 100644 --- a/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py +++ b/examples/opensora_hpcai/opensora/schedulers/rectified_flow.py @@ -5,8 +5,6 @@ except ImportError: from typing_extensions import Literal # FIXME: python 3.7 -# import numpy as np -# import mindspore as ms from tqdm import tqdm from mindspore import Tensor, dtype, ops diff --git a/examples/opensora_hpcai/scripts/train.py b/examples/opensora_hpcai/scripts/train.py index d749b8f6e9..d554b62e88 100644 --- a/examples/opensora_hpcai/scripts/train.py +++ b/examples/opensora_hpcai/scripts/train.py @@ -147,10 +147,6 @@ def init_env( if dynamic_shape: logger.info("Dynamic shape mode enabled, repeat_interleave/split/chunk will be called from mint module") set_dynamic_mode(True) - # if mode == 0: - # FIXME: this is a temp fix for dynamic shape training in graph mode. may remove in future version. - # can append adamw fusion flag if use nn.AdamW optimzation for acceleration - # ms.set_context(graph_kernel_flags="--disable_packet_ops=Reshape") return rank_id, device_num @@ -563,14 +559,16 @@ def main(args): # compute total steps and data epochs (in unit of data sink size) if args.dataset_sink_mode and args.sink_size != -1: - steps_per_sink = args.sink_size + # in data sink mode, data sink size determines the number of training steps per epoch. + steps_per_epoch = args.sink_size else: - steps_per_sink = dataset_size + # without data sink, number of training steps is determined by number of data batches of the whole training set. + steps_per_epoch = dataset_size if args.train_steps == -1: assert args.epochs != -1 total_train_steps = args.epochs * dataset_size - sink_epochs = math.ceil(total_train_steps / steps_per_sink) + sink_epochs = math.ceil(total_train_steps / steps_per_epoch) else: total_train_steps = args.train_steps # asume one step need one whole epoch data to ensure enough batch loading for training @@ -585,11 +583,11 @@ def main(args): ckpt_save_interval = args.ckpt_save_steps else: # still need to count interval in sink epochs - ckpt_save_interval = max(1, args.ckpt_save_steps // steps_per_sink) - if args.ckpt_save_steps % steps_per_sink != 0: + ckpt_save_interval = max(1, args.ckpt_save_steps // steps_per_epoch) + if args.ckpt_save_steps % steps_per_epoch != 0: logger.warning( f"`ckpt_save_steps` must be times of sink size or dataset_size under dataset sink mode." - f"Checkpoint will be saved every {ckpt_save_interval * steps_per_sink} steps." + f"Checkpoint will be saved every {ckpt_save_interval * steps_per_epoch} steps." ) step_mode = step_mode if args.step_mode is None else args.step_mode @@ -849,7 +847,7 @@ def main(args): loss_val = float(loss.asnumpy()) logger.info( f"Epoch {epoch}, Step {step}, loss {loss_val:.5f}, Global step {global_step}," - + " Shape: {tuple(data[0].shape)}, Step time {step_time*1000:.2f}ms" + + f" Shape: {tuple(data[0].shape)}, Step time {step_time*1000:.2f}ms" ) if overflow: logger.warning("overflow detected")