GPT2从头开始实现

注:本章节基于MindFormers提供Base_Trainer,Task等基础功能已实现的情况下,从头开始实现一个模型。

1 模型构建及注册(以预训练语言模型为例)

1.1 熟悉模型架构

GPT2模型示例图

  • 查看开源权威代码,明确论文实现细节。确定GPT2LMHeadModel为模型预训练模块,需在Mindformers实现其及其依赖模块。

    Huggingface-GPT2:
      modeling_gpt2.py
          ├── GPT2Attention # Multi-head attention模块
            ├── GPT2MLP # transformer前馈层
            ├── GPT2Block # transformers block
            ├── GPT2Model # GPT2 backbone
            └── GPT2LMHeadModel # 模型预训练模块
    
  • 分析需实现模块,明确Mindformers中的已有能力。上述GPT2Blocktransformer-decoder的单层实现,GPT2AttentionGPT2MLPGPT2Block的依赖模块。另Mindformers已集成整个transformer,故GPT2Block可基于Mindformers上的transformer实现。

    Mindformers
      ├── mindformers
          ├── modules
              ├── transformer
                  └── transformer.py
    
  • 基于Mindformerstransformer实现GPT2网络。

    最终GPT2网络的整体目录如下:

    Mindformers
      ├── mindformers
          ├── models
              ├── gpt2
                  ├── __init__.py
                  ├── convert_weight.py # 权重转化脚本
                  ├── gpt2.py # 模型网络实现脚本
                  ├── gpt2_config.py # 模型配置脚本
                  ├── gpt2_processor.py # 预处理脚本
                  ├── gpt2_tokenizer.py # tokenizer脚本
                  └── gpt_modules.py # 模型transformer层实现脚本
    

    上述目录下详细代码参考

    __init__.py

    convert_weight.py

    gpt2.py

    gpt2_config.py

    gpt2_processor.py

    gpt2_tokenizer.py

    gpt_modules.py

  • 实现GPT2 backbone网络GPT2Model(详细代码参考gpt2.py

    # 具体实现参考Mindformers/mindformers/models/gpt2/gpt2.py
    class GPT2Model(nn.Cell):
        """ function description """
        def __init__(self, config):
            super(GPT2Model, self).__init__()
    
            self.embedding = GPTEmbeddingLayer(config) # embedding层
            self.layernorm = LayerNorm((config.hidden_size,)).to_float(config.layernorm_compute_type)
    
            self.blocks = nn.CellList()
            for i in range(config.num_layers):
                # GPTTransformerDecoderLayer为transformer block,如有需要,该部分可从transformer继承decoder重写
                block = GPTTransformerDecoderLayer(
                    hidden_size=config.hidden_size,
                    batch_size=config.batch_size,
                    ffn_hidden_size=config.hidden_size * config.expand_ratio,
                    seq_length=config.seq_length,
                    num_heads=config.num_heads,
                    attention_dropout_rate=config.attention_dropout_rate,
                    hidden_dropout_rate=config.hidden_dropout_rate,
                    hidden_act=config.hidden_act,
                    param_init_type=config.param_init_type,
                    layernorm_compute_type=config.layernorm_compute_type,
                    softmax_compute_type=config.softmax_compute_type,
                    parallel_config=config.parallel_config.dp_mp_config,
                    moe_config=moe_config)
                self.blocks.append(block)
    
            self.input_position = Tensor(np.arange(config.seq_length), mstype.int32)
    
        def construct(self, input_ids, attention_mask):
            # word/position embedding
            input_embedding, embedding_table = self.embedding(input_ids, input_position)
    
            # multi-layer multi-head attention
            for i in range(self.num_layers):
                hidden_states = self.blocks[i](hidden_states, attention_mask)
            # layernorm
            output_state = self.layernorm(hidden_states)
    
            return output_state, embedding_table
    
  • 基于GPT2Model实现GPT2预训练模块GPT2LMHeadModel(详细代码参考gpt2.py

    __all__ = ['GPT2LMHeadModel']:公开模型预训练接口,方便用户从外部调用。 @MindFormerRegister.register(MindFormerModuleType.MODELS)Mindformers的注册机制,将类注册到对应模块,方便通过配置文件进行实例化。 _support_list = MindFormerBook.get_model_support_list()['gpt2']:高阶接口使用时用于检索可用模型,可通过from_pretrained的方法实例化。

    # 具体实现参考Mindformers
    __all__ = ['GPT2LMHeadModel'] # 公开接口
    
    @MindFormerRegister.register(MindFormerModuleType.MODELS) # 注册到MODELS
    class GPT2LMHeadModel(PreTrainedModel):
        """ function description """
        _support_list = MindFormerBook.get_model_support_list()['gpt2']
    
        def __init__(self, config: GPT2Config = None):
            config = config if config is not None else GPT2Config()
            super(GPT2LMHeadModel, self).__init__(config, auto_prefix=True)
    
            # 用于生成下三角attention矩阵
            self.get_attention_mask = AttentionMask(seq_length=config.seq_length)
    
            self.backbone = GPT2Model(config) # gpt2 backbone
            # backbone网络输出隐向量向词表的映射矩阵
            self.head = GPTHead(hidden_size=config.hidden_size, vocab_size=config.vocab_size)
            # 交叉熵损失函数
            self.loss = CrossEntropyLoss(parallel_config=loss_parallel_config)
    
        def construct(self, input_ids, attention_mask=None):
            """ parameters description """
            # 生成下三角attention矩阵
            attention_mask = self.get_attention_mask(attention_mask)
            # 获取backbone网络的输出
            output_states, embedding_table = self.backbone(tokens, attention_mask)
            # backbone网络输出映射到词表空间
            logits = self.head(output_states, embedding_table)
    
            if self.phase != 'train':
                # 非训练时返回logits
                return logits, tokens, loss_mask
            # 训练时计算并返回loss
            loss = self.loss(logits, labels, loss_mask)
            return loss
    
  • 编写模型参数配置文件gpt2_config.py(详细代码参考gpt2_config.py__all__ = ['GPT2Config']:公开模型预训练接口,方便用户从外部调用。 @MindFormerRegister.register(MindFormerModuleType.CONFIG)Mindformers的注册机制,将类注册到对应模块,方便通过配置文件进行实例化。 _support_list = MindFormerBook.get_config_support_list()['gpt2']:高阶接口使用时用于检索可用模型,可通过from_pretrained的方法实例化。

    # 具体实现参考Mindformers/mindformers/models/gpt2/gpt2_config.py
    __all__ = ['GPT2Config']
    
    @MindFormerRegister.register(MindFormerModuleType.CONFIG) # 注册到CONFIG
    class GPT2Config(PretrainedConfig):
        """ class description """
    
        _support_list = MindFormerBook.get_config_support_list()['gpt2']
    
        def __init__(self,
                     seq_length: int = 1024,
                     vocab_size: int = 50257,
                     hidden_size: int = 768,
                     num_layers: int = 12,
                     num_heads: int = 12,
                     expand_ratio: int = 4,
                     embedding_dropout_prob: float = 0.1,
                     hidden_dropout_rate: float = 0.1,
                     attention_dropout_rate: float = 0.1,
                     param_init_type: str = "float32",
                     layernorm_compute_type: str = "float32",
                     softmax_compute_type: str = "float32",
                     compute_dtype: str = "float16",
                     hidden_act: str = 'gelu',
                     **kwargs):
            super(GPT2Config, self).__init__(**kwargs)
            self.seq_length = seq_length
            self.vocab_size = vocab_size
            self.hidden_size = hidden_size
            self.num_layers = num_layers
            self.num_heads = num_heads
            self.expand_ratio = expand_ratio
            self.embedding_dropout_prob = embedding_dropout_prob
            self.hidden_dropout_rate = hidden_dropout_rate
            self.attention_dropout_rate = attention_dropout_rate
            self.param_init_type = convert_mstype(param_init_type)
            self.layernorm_compute_type = convert_mstype(layernorm_compute_type)
            self.softmax_compute_type = convert_mstype(softmax_compute_type)
            self.compute_dtype = convert_mstype(compute_dtype)
            self.hidden_act = hidden_act
    
  • 实现Trainer逻辑,支持trainer.train/evaluate/predict拉起训练/评估/推理流程。

    最终目录结构为:

    Mindformers
      ├── mindformers
          ├── trainer
              ├── causal_language_modeling
                  ├── __init__.py
                  └── causal_language_modeling.py # train/evaluate/predict实现脚本
    

    上述目录下详细代码参考:

    __init__.py

    causal_language_modeling.py

    @MindFormerRegister.register(MindFormerModuleType.TRAINER)Mindformers的注册机制,将类注册到对应模块,方便通过配置文件进行实例化 train/evaluate/predictMindformers已实现BaseTrainer类,该类已实现基础训练功能,如训练流程有特殊需求可自行修改实现。

    # 具体实现参考Mindformers/mindformers/trainer/causal_language_modeling/causal_language_modeling.py
    @MindFormerRegister.register(MindFormerModuleType.TRAINER)
    class CausalLanguageModelingTrainer(BaseTrainer):
        """ class description """
        def __init__(self, model_name: str = None):
            super(CausalLanguageModelingTrainer, self).__init__("text_generation", model_name)
    
        def train(self,
                  config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                  network: Optional[Union[Cell, PreTrainedModel]] = None,
                  dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
                  wrapper: Optional[TrainOneStepCell] = None,
                  optimizer: Optional[Optimizer] = None,
                  callbacks: Optional[Union[Callback, List[Callback]]] = None,
                  **kwargs):
            """ function description """
            self.training_process(
                config=config,
                network=network,
                callbacks=callbacks,
                dataset=dataset,
                wrapper=wrapper,
                optimizer=optimizer,
                **kwargs)
    
        def evaluate(self,
                     config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                     network: Optional[Union[Cell, PreTrainedModel]] = None,
                     dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
                     callbacks: Optional[Union[Callback, List[Callback]]] = None,
                     compute_metrics: Optional[Union[dict, set]] = None,
                     **kwargs):
            """ function description """
            metric_name_list = [metric['type'] for metric in config.metric]
            if len(metric_name_list) == 1:
                metric_name = metric_name_list[0]
                kwargs.setdefault("metric_name", metric_name)
    
            self.evaluate_process(
                    config=config,
                    network=network,
                    dataset=dataset,
                    callbacks=callbacks,
                    compute_metrics=compute_metrics,
                    **kwargs)
    
        def predict(self,
                    config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                    input_data: Optional[Union[str, list, GeneratorDataset]] = None,
                    network: Optional[Union[Cell, PreTrainedModel]] = None,
                    tokenizer: Optional[PreTrainedTokenizerBase] = None,
                    **kwargs):
            """ function description """
            # 支持字符串和数据集传入
            if input_data is None:
                input_data = config.input_data
    
            if not isinstance(input_data, (str, list, GeneratorDataset)):
                raise ValueError("Input data's type must be one of "
                                 f"[str, list, GeneratorDataset], but got type {type(input_data)}")
    
            if isinstance(input_data, str) and os.path.isfile(input_data):
                with open(input_data, 'r') as fp:
                    input_data_list = []
                    for line in fp:
                        input_data_list.extend(line)
                input_data = input_data_list
    
            return self.predict_process(config=config,
                                        input_data=input_data,
                                        task='text_generation',
                                        network=network,
                                        tokenizer=tokenizer,
                                        **kwargs)
    
  • 配置yaml文件,在yaml配置文件中添加训练/评估/推理所需要的所有配置

    需要区分不同参数规模的yaml文件

    最终目录结构为:

    Mindformers
        ├── configs
            ├── gpt2
                ├── run_gpt2.yaml # gpt2 small配置文件
                ├── run_gpt2_13b.yaml # gpt 13b配置文件
                ├── run_gpt2_52b.yaml # gpt 52b配置文件
                ├── run_gpt2_lora.yaml # gpt2 small lora微调配置文件
                ├── run_gpt2_txtcls.yaml # gpt2 small 文本分类配置文件
                ├── run_gpt2_xl.yaml # gpt2 xlarge配置文件
                └── run_gpt2_xl_lora.yaml # gpt2 xlarge lora微调配置文件
    

    上述目录下详细代码参考

    run_gpt2.yaml

    run_gpt2_13b.yaml

    run_gpt2_52b.yaml

    run_gpt2_lora.yaml

    run_gpt2_txtcls.yaml

    run_gpt2_xl.yaml

    run_gpt2_xl_lora.yaml

    # 具体实现参考Mindformers
    run_gpt2.yaml # 配置文件名
        ├── context # 上下文配置
        ├── runner # batch_size和wrapper等配置
        ├── parallel # 分布式配置(网络中需适配分布式逻辑)
        ├── recompute_config # 重计算配置
        ├── profile # profile性能采集配置
        ├── trainer # trainer配置
          ├── type # 上述TRAINER注册,在此处可通过type进行实例化调用
          └── model_name # 模型名
        ├── train/eval_dataset # 训练/评估数据集配置
        ├── model # 模型参数配置
          ├── model # 模型参数配置
              ├── type # 上述CONFIG注册,在此处可通过type进行实例化调用
              ├── num_layers # transformers层数
              ├── num_heads # multi-head attention头数
              └── ......
          ├── arch
              └── GPT2LMHeadModel # 上述MODELS注册,在此处可通过该名称进行实例化调用
        ├── lr_schedule # 学习率配置(如使用自定义的,请先实现并注册,参考mindformers/core/lr)
        ├── optimizer # 优化器配置(如使用自定义的,请先实现并注册,参考mindformers/core/optim)
        ├── callbacks # 回调函数配置
        └── metric # 评估指标(如使用自定义的,请先实现并注册,参考mindformers/core/optim)
    
  • Mindformers/mindformer_book.py中添加模型对应的检索名,可通过from_pretrained方法实例化(可选)

    # 具体实现参考Mindformers/mindformers/mindformer_book.py
    _CONFIG_SUPPORT_LIST = OrderedDict([
            ('gpt2', [
                'gpt2',
            ]),
        ])
    
        _MODEL_SUPPORT_LIST = OrderedDict([
            ('gpt2', [
                'gpt2',
            ]),
        ])
    
    # test from_pretrained
    from mindformers import GPT2Config, GPT2LMHeadModel
    
      print(GPT2Config._support_list)
      # ['gpt2']
      config = GPT2Config.from_pretrained("gpt2")
    
      print(GPT2LMHeadModel._support_list)
      # ['gpt2']
      model = GPT2LMHeadModel.from_pretrained("gpt2")
    
  • 代码测试

    数据集构建和测试代码可参考模型文档

    基于Mindformers完成模型实现后,建议撰写模型文档。

2 数据集构建及注册

2.1 Processor

  • Processor中初始化了tokenizer,对输入数据进行处理,Mindformers中已经实现了ProcessorMixin类,若无特别需求可直接继承该基类增加处理逻辑,如有需要可自行实现逻辑。

  • @MindFormerRegister.register(MindFormerModuleType.PROCESSOR)该装饰器对GPT2Processor进行了注册,将该类注册到了对应模块当中,由此可以通过配置文件进行实例化。

  • __all__ = ['GPT2Processor']公开了模型预训练接口,方便用户从外部调用。

  • _support_list = MindFormerBook.get_config_support_list()['gpt2']:高阶接口使用时用于检索可用模型,可通过from_pretrained的方法实例化GPT2Processor

    # gpt2_processor.py
    __all__ = ['GPT2Processor']
    
    @MindFormerRegister.register(MindFormerModuleType.PROCESSOR)
    class GPT2Processor(ProcessorMixin):
      """class description"""
      _support_list = MindFormerBook.get_processor_support_list()['gpt2']
    
      def __init__(self, tokenizer=None,
                  max_length=128, padding='max_length', return_tensors='ms'):
          super(GPT2Processor, self).__init__(
              tokenizer=tokenizer,
              max_length=max_length,
              padding=padding,
              return_tensors=return_tensors
          )
    
      def __call__(self, text_input=None, image_input=None):
          """call function description"""
          output = {}
          if text_input is not None and self.tokenizer:
              if not isinstance(self.tokenizer, PreTrainedTokenizerBase):
                  raise TypeError(f"tokenizer should inherited from the PreTrainedTokenizerBase,"
                                  f" but got {type(self.tokenizer)}.")
              # 将输入数据增加batch维度
              if isinstance(text_input, str):
                  text_input = [text_input]
              text_output = self.tokenizer(text_input,return_tensors=self.return_tensors,
                                           max_length=self.max_length,
                                           padding=self.padding)["input_ids"]
              # 处理结果存入字典中
              output['text'] = text_output
          return output
    

2.2 Tokenizer

  • Tokenizer通常用于对原始的文本数据进行转换,它将数据转为token,进而送入模型中进行处理。在Mindformers中已经实现了Tokenizer基类,GPT2Tokenizer可直接继承Tokenizer类进行具体逻辑的实现。如有需要可从头编写相关逻辑。

  • @MindFormerRegister.register(MindFormerModuleType.TOKENIZER)该装饰器对GPT2Tokenizer进行了注册,将该类注册到了对应模块当中,由此可以通过配置文件进行实例化。

  • __all__ = ['GPT2Tokenizer'],公开模型预训练接口,方便用户从外部调用。

  • _support_list = MindFormerBook.get_config_support_list()['gpt2']:高阶接口使用时用于检索可用模型,可通过from_pretrained的方法实例化GPT2Tokenizer

    # gpt2_tokenizer.py
    # 以下展示了部分核心代码,具体实现请参考Mindformers
    @MindFormerRegister.register(MindFormerModuleType.TOKENIZER)
    class GPT2Tokenizer(PreTrainedTokenizer):
        """class description"""
        vocab_files_names = VOCAB_FILES_NAMES
        model_input_names = ["input_ids", "token_type_ids", "attention_mask"]
        FILE_LIST = ['tokenizer_config.json']
        _support_list = MindFormerBook.get_tokenizer_support_list()['gpt2']
        def __init__(self, vocab_file, merges_file,
                     errors="replace",
                     unk_token="<|endoftext|>",
                     bos_token="<|endoftext|>",
                     eos_token="<|endoftext|>",
                     pad_token="<|endoftext|>",
                     add_prefix_space=False,
                     add_bos_token=True,
                     add_eos_token=True,
                     **kwargs):
            # 调用基类AddedToken方法添加特殊token符
            bos_token = AddedToken(bos_token, lstrip=False, rstrip=False) if isinstance(bos_token, str) else bos_token
            eos_token = AddedToken(eos_token, lstrip=False, rstrip=False) if isinstance(eos_token, str) else eos_token
            unk_token = AddedToken(unk_token, lstrip=False, rstrip=False) if isinstance(unk_token, str) else unk_token
            pad_token = AddedToken(pad_token, lstrip=False, rstrip=False) if isinstance(pad_token, str) else pad_token
            super().__init__(errors=errors,unk_token=unk_token,bos_token=bos_token,eos_token=eos_token,
                             pad_token=pad_token,add_prefix_space=add_prefix_space,**kwargs)
            self.add_bos_token = add_bos_token
            self.add_eos_token = add_eos_token
    
            # 读取词表文件构造编码器与解码器
            with open(vocab_file, encoding="utf-8") as vocab_handle:
                self.encoder = json.load(vocab_handle)
            self.decoder = {v: k for k, v in self.encoder.items()}
            self.errors = errors  # how to handle errors in decoding
            self.byte_encoder = bytes_to_unicode()
            self.byte_decoder = {v: k for k, v in self.byte_encoder.items()}
    
            # 读取合并文件构造BPE分词器
            with open(merges_file, encoding="utf-8") as merges_handle:
                bpe_merges = merges_handle.read().split("\n")[1:-1]
            bpe_merges = [tuple(merge.split()) for merge in bpe_merges]
            self.bpe_ranks = dict(zip(bpe_merges, range(len(bpe_merges))))
            self.cache = {}
            self.add_prefix_space = add_prefix_space
    
            self.pat = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")
    
        ...
    
        def bpe(self, token):
            """GPT2 BPE解码实现"""
            if token in self.cache:
                return self.cache[token]
            word = tuple(token)
            pairs = get_pairs(word)
    
            if not pairs:
                return token
    
            while True:
                bigram = min(pairs, key=lambda pair: self.bpe_ranks.get(pair, float("inf")))
                if bigram not in self.bpe_ranks:
                    break
                first, second = bigram
                new_word = []
                i = 0
                while i < len(word):
                    try:
                        j = word.index(first, i)
                    except ValueError:
                        new_word.extend(word[i:])
                        break
                    else:
                        new_word.extend(word[i:j])
                        i = j
    
                    if word[i] == first and i < len(word) - 1 and word[i + 1] == second:
                        new_word.append(first + second)
                        i += 2
                    else:
                        new_word.append(word[i])
                        i += 1
                new_word = tuple(new_word)
                word = new_word
                if len(word) == 1:
                    break
                else:
                    pairs = get_pairs(word)
            word = " ".join(word)
            self.cache[token] = word
            return word
    

2.3 Mindrecord文件生成

MindRecordMindSpore 特有的一种数据格式,使得数据得以被高效地处理,输入数据转换为 MindRecord 的过程就是数据集处理的过程,该处理脚本需要用户自行编写。接下来以Wikitext2数据集为例,开发GPT2数据处理脚本。

依赖库导入

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import collections
import logging
import re
import numpy as np

from mindspore.mindrecord import FileWriter
from mindformers import AutoTokenizer

数据读取

  • 构造preprocess_data函数,传入input_file参数,根据相应路径逐行读入数据集文件内容。


def preprocess_data(input_file):
  """ 数据读入 """
  dataset_valid = []
  passage = []
  count = 0
  with open(input_file, 'r', encoding='utf-8') as f:
      for line in f:
          line = line.strip()
          if line:
              if line.startswith('=') and line.endswith('=') and passage:
                  dataset_valid.append(passage)
                  count += 1
                  passage = []
              elif line.startswith('=') and line.endswith('='):
                  continue
              else:
                  passage.append(line)
  print('read {} file finished!\n total count = {}'.format(input_file, count))

  res = []
  for line in dataset_valid:
      text = ""
      for sentence in line:
          sentence = wikitext_clean(sentence)
          text = text + " " + sentence
      text = text.strip()
      res.append(text)
  return res

数据清洗

  • 根据Wikitext2数据集的格式特点编写数据清洗逻辑,构造wikitext_clean函数供preprocess_data方法调用。

def wikitext_clean(string):
  """ 数据清洗 """
  # 消除空格
  string = string.replace("s '", "s'")
  string = re.sub(r"/' [0-9]/", r"/'[0-9]/", string)
  # 处理数字分隔符
  string = string.replace(" @-@ ", "-")
  string = string.replace(" @,@ ", ",")
  string = string.replace(" @.@ ", ".")
  # 处理标点符号
  string = string.replace(" : ", ": ")
  string = string.replace(" ; ", "; ")
  string = string.replace(" . ", ". ")
  string = string.replace(" .", ".")
  string = string.replace(" ! ", "! ")
  string = string.replace(" ? ", "? ")
  string = string.replace(" , ", ", ")
  # 处理括号
  string = re.sub(r"\(\s*([^\)]*?)\s*\)", r"(\1)", string)
  string = re.sub(r"\[\s*([^\]]*?)\s*\]", r"[\1]", string)
  string = re.sub(r"{\s*([^}]*?)\s*}", r"{\1}", string)
  string = re.sub(r"\"\s*([^\"]*?)\s*\"", r'"\1"', string)
  string = re.sub(r"'\s*([^']*?)\s*'", r"'\1'", string)
  # 其他处理项
  string = string.replace("= = = =", "====")
  string = string.replace("= = =", "===")
  string = string.replace("= =", "==")
  string = string.replace(" " + chr(176) + " ", chr(176))
  string = string.replace(" \n", "\n")
  string = string.replace("\n ", "\n")
  string = string.replace(" N ", " 1 ")
  string = string.replace(" 's", "'s")

  return string

文本转换为token

  • 实例化一个GPT2Tokenizer,编写create_instance函数,将tokenizer和相应的参数传入方法中,处理每条数据生成的token

def create_instance(tokenizer, sentence, ids, max_length=None):
  """文本转token"""
  pair_ids = None
  if len(sentence) == 2:
      pair_ids = tokenizer.encode(sentence[1])
  output = tokenizer.prepare_for_model(ids=ids,
                                          pair_ids=pair_ids,
                                          add_special_tokens=False,
                                          max_length=max_length,
                                          padding='max_length',
                                          truncate_direction="LEFT",
                                          return_overflowing_tokens=False,
                                          return_attention_mask=True)
  return output

Mindrecord格式转换

  • 利用Mindspore构造的writer对象,将instance中每一条输入数据生成的三种数据input_ids/attention_mask/labels写入mindrecord中。

def write_instance_to_file(writer, instance):
  """将数据写入mindrecord中"""
  input_ids = instance["input_ids"]
  attention_mask = instance["attention_mask"]
  labels = instance["input_ids"]

  features = collections.OrderedDict()
  features["input_ids"] = np.asarray(input_ids).astype(np.int32)
  features["attention_mask"] = np.asarray(attention_mask).astype(np.int32)
  features["labels"] = np.asarray(labels).astype(np.int32)

  # 转换为mindrecord
  writer.write_raw_data([features])

  return features

主函数

  • 最后完成主函数逻辑,控制整个数据处理流程。

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input_file", type=str, default="../wikitext-2/wiki.valid.tokens",
                        help='Input raw text file. ')
    parser.add_argument("--output_file", type=str, default="../wikitext2_processed/wikitext-2.mindrecord",
                        help='Output MindRecord file. ')
    parser.add_argument("--num_splits", type=int, default=1,
                        help="The MindRecord file will be split into the number of partition. ")
    parser.add_argument("--max_length", type=int, default=1024, help="Maximum sequence length. ")
    parser.add_argument("--tokenizer_type", type=str, default="gpt2",
                        help="Tokenizer type, can be set to any tokenizer "
                            "if its relevant model supports prompt text classification. ")
    parser.add_argument("--data_columns", type=list, default=["input_ids", "attention_mask"],
                        help="The data columns which should be saved in mindrecord. This can refer used yaml file. ")
    args = parser.parse_args()

    # 实例化GPT2Tokenizer
    tokenizer = AutoTokenizer.from_pretrained(args.tokenizer_type)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.add_tokens([])

    input_file = args.input_file
    logging.info("***** Reading from input files *****")
    logging.info("Input File: %s", input_file)

    output_file = args.output_file
    logging.info("***** Writing to output files *****")
    logging.info("Output File: %s", output_file)

    # 构造转换mindrecord的FileWriter类
    writer = FileWriter(output_file, args.num_splits)

    # 预定义mindrecord输出列:["input_ids", "attention_mask", "labels"]
    data_schema = {"input_ids": {"type": "int32", "shape": [-1]},
                "attention_mask": {"type": "int32", "shape": [-1]},
                "labels": {"type": "int32", "shape": [-1]}
                }
    data_columns = args.data_columns
    need_del_keys = set(data_columns) - set(data_schema.keys())
    for need_del_key in need_del_keys:
        del data_schema[need_del_key]
    writer.add_schema(data_schema, "lm-schema")

    # wikitext2数据预处理
    dataset_valid = preprocess_data(args.input_file)
    total_written = 0
    logging.info("***** Reading from  %s *****", input_file)
    text_total = "\n".join(dataset_valid)  # the logic of \n is copied from modelzoo
    sentence = text_total.strip().split("\t")
    block_size = args.max_length

    # 将文本转换为token
    total_ids = tokenizer.encode(sentence[0])
    total_length = len(total_ids)
    total_length = (total_length // block_size) * block_size
    print("total_length", total_length)

    # 数据分批转换为mindrecord
    for i in range(total_length // block_size):
        ids = total_ids[block_size*i:block_size*(i+1)]
        output = create_instance(tokenizer, sentence, ids, args.max_length)
        write_instance_to_file(writer, instance=output)
        total_written += 1

    # 将内存中的数据同步到磁盘,生成mindrecord文件
    writer.commit()
    logging.info("Wrote %d total instances", total_written)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    main()
  • 以上的代码进行合并后,便是处理wikitext2数据集的完整代码文件mindformers/tool/dataset_preprocess/gpt2/wikitext2_data_process.py,具体使用方法为:

    # 训练
    python mindformers/tools/dataset_preprocess/gpt2/wikitext2_data_process.py --input_file ./wikitext-2/wiki.train.tokens --output_file ./wikitext-2.train..mindrecord --max_length 1025
    # 评测
    python mindformers/tools/dataset_preprocess/gpt2/wikitext2_data_process.py --input_file ./wikitext-2/wiki.valid.tokens --output_file ./wikitext-2.valid.mindrecord --max_length 1024
    

2.4 Dataset构建

针对不同的下游任务,有诸多dataset的构建方法,接下来以Causal Language Model Dataset为例,为GPT2的训练构造数据集。

  • Mindformers中实现了BaseDataset基类,用户可直接继承使用,如有特殊需求可自行实现。

  • @MindFormerRegister.register(MindFormerModuleType.DATASET)该装饰器对CausalLanguageModelDataset进行了注册,将该类注册到了对应模块当中,由此可以通过配置文件进行实例化。

  • 构造的Dataset类为Mindspore的通用数据集API,具体的Dataset接口可见Mindspore官网说明。

    # causal_language_model_dataset.py
    # 以下展示了部分核心代码,具体实现请参考Mindformers
    @MindFormerRegister.register(MindFormerModuleType.DATASET)
    class CausalLanguageModelDataset(BaseDataset):
        """ class description"""
        def __new__(cls, dataset_config: dict = None):
            logger.info("Now Create Causal Language Model Dataset.")
            # 考虑分布式训练/微调情况
            rank_id = get_real_rank()
            device_num = get_real_group_size()
            dataset_config = copy.deepcopy(dataset_config)
            cls.init_dataset_config(dataset_config)
            rank_id, device_num = cls._check_device_rank_for_parallel(rank_id, device_num)
            dataset_config.rank_id = rank_id
            dataset_config.device_num = device_num
    
            # 根据dataset_config构造dataset
            if dataset_config.data_loader.type != "MindDataset" and \
                    dataset_config.data_loader.type != "TFRecordDataset":
                dataset = cls._process_raw_text_data(dataset_config)
            else:
                dataset = cls._process_mindrecord_data(dataset_config)
    
            type_cast_op = C.TypeCast(mstype.int32)
            if dataset_config.eos_reset:
                if cls._is_semi_full_batch() or cls._is_data_parallel():
                    rank_id = 0
                    dis = dataset_config.batch_size
                else:
                    # 每张卡都从完整的batch中获取其中的batch切片
                    dis = dataset_config.batch_size // device_num
                    if dataset_config.batch_size % device_num != 0:
                        raise ValueError(
                            f"batch size {dataset_config.batch_size} should be a multiple of device number {device_num}."
                            " You should change the args: per_batch_size.")
    
                # 定义数据集batch大小
                dataset = dataset.batch(dataset_config.batch_size,
                                        drop_remainder=dataset_config.drop_remainder,
                                        output_columns=dataset_config.input_columns)
                # 自定义数据增强操作函数
                map_func = lambda input_ids: get_input_data_batch_slice_map(input_ids,
                                                                            eos_token_id=dataset_config.eos_token_id,
                                                                            rank_id=rank_id,
                                                                            dis=dis)
                # 对数据集应用自定义数据增强
                if is_version_ge(mindspore.__version__, '1.11.0'):
                    dataset = dataset.map(operations=map_func, input_columns=dataset_config.input_columns,
                                        output_columns=dataset_config.output_columns)
                else:
                    dataset = dataset.map(operations=map_func, input_columns=dataset_config.input_columns,
                                        output_columns=dataset_config.output_columns,
                                        column_order=dataset_config.output_columns)
                # 对数据集的指定列名顺序进行排列
                dataset = dataset.project(columns=dataset_config.output_columns)
    
                for input_arg in dataset_config.output_columns:
                    dataset = dataset.map(operations=type_cast_op, input_columns=input_arg)
            else:
                dataset = dataset.batch(dataset_config.batch_size,
                                        drop_remainder=dataset_config.drop_remainder,
                                        output_columns=dataset_config.input_columns,
                                        num_parallel_workers=dataset_config.num_parallel_workers)
                dataset = dataset.project(columns=dataset_config.input_columns)
                for input_arg in dataset_config.input_columns:
                    dataset = dataset.map(operations=type_cast_op, input_columns=input_arg)
    
            # 数据集重复的次数
            dataset = dataset.repeat(dataset_config.repeat)
    
            return dataset
    

3 Task构建及注册

使用MindFormers已支持的task,请参考Trainer APIPipeline APITask

GPT2作为大语言模型,其主要的task是文本生成和对话问答方面的内容,本小节将以文本生成任务为例,介绍实现GPT2文本生成的具体逻辑以及任务注册的过程。在Mindformers中,各类下游任务,如文本生成图像分类对话问答等任务以Pipeline的形式构建,目前Mindformers仓中已实现了BasePipeline基类。需要完成的TextGenerationPipeline可直接继承BasePipeline实现具体的逻辑,如需实现的下游任务类型较为复杂,则用户可自行从头编写Pipeline类的逻辑。

  • @MindFormerRegister.register(MindFormerModuleType.PIPELINE, alias="text_generation")该装饰器完成了对TextGenerationPipeline的注册,由此可以通过配置文件使用高级接口进行实例化。

  • TextGenerationPipeline中包含了输入数据的前、中、后处理,主要涵盖以下的过程:

    1. 前处理:初始化GPT2Tokenizer,将输入inputs转换为token,即输出input_ids

    2. 模型处理:获取前处理中的input_ids作为model_inputs,调用网络的generate方法完成网络前向过程的计算,得到输出文本的output_ids

    3. 后处理:利用tokenizeroutput_ids解码,把token重新转换为文字,得到最后的文本输出。

    # text_generation_pipeline.py
    # 以下展示了部分核心代码,具体实现请参考Mindformers
    __all__ = ['TextGenerationPipeline']
    
    @MindFormerRegister.register(MindFormerModuleType.PIPELINE, alias="text_generation")
    class TextGenerationPipeline(BasePipeline):
        """class description"""
        _support_list = _setup_support_list(["gpt2", "glm"])
        return_name = 'text_generation'
    
        def __init__(self, model: Union[str, PreTrainedModel, Model],
                    tokenizer: Optional[PreTrainedTokenizerBase] = None,
                    **kwargs):
            # model/tokenizer输入类型判断
            if isinstance(model, str):
                if model in self._support_list or os.path.isdir(model):
                    if tokenizer is None:
                        tokenizer = AutoProcessor.from_pretrained(model).tokenizer
                    model = AutoModel.from_pretrained(model)
                    if not isinstance(tokenizer, PreTrainedTokenizerBase):
                        raise TypeError(f"tokenizer should be inherited from"
                                        f" PreTrainedTokenizerBase, but got {type(tokenizer)}.")
                else:
                    raise ValueError(f"{model} is not supported by {self.__class__.__name__},"
                                    f"please selected from {self._support_list}.")
            if not isinstance(model, (PreTrainedModel, Model)):
                raise TypeError(f"model should be inherited from PreTrainedModel or Model, but got type {type(model)}.")
            if tokenizer is None:
                raise ValueError(f"{self.__class__.__name__}"
                                " requires for a tokenizer.")
            super().__init__(model, tokenizer, **kwargs)
    
        # Pipeline输入参数处理
        def _sanitize_parameters(self, **pipeline_parameters):
            """function description"""
            # 数据前处理参数
            preprocess_keys = ['keys', 'add_special_tokens']
            preprocess_params = {}
            for item in preprocess_keys:
                if item in pipeline_parameters:
                    preprocess_params[item] = pipeline_parameters.get(item)
    
            # 后处理参数
            postprocess_params = {}
    
            # 模型处理参数
            forward_key_name = ['top_k', 'top_p', 'do_sample', 'eos_token_id', 'repetition_penalty', 'max_length', 'seed']
            forward_kwargs = {}
            for item in forward_key_name:
                if item in pipeline_parameters:
                    forward_kwargs[item] = pipeline_parameters.get(item)
            return preprocess_params, forward_kwargs, postprocess_params
    
        # Pipeline预处理
        def preprocess(self, inputs: Union[str, dict, Tensor],
                    **preprocess_params):
            """function description"""
            add_special_tokens = preprocess_params.get('add_special_tokens', True)
    
            # 输入数据获取
            if isinstance(inputs, dict):
                keys = preprocess_params.get('keys', None)
                default_src_language_name = 'text'
                feature_name = keys.get('src_language', default_src_language_name) if keys else default_src_language_name
                inputs = inputs[feature_name]
                if isinstance(inputs, mindspore.Tensor):
                    inputs = inputs.asnumpy().tolist()
    
            # 利用tokenizer将文本转换为token
            input_ids = self.tokenizer(inputs, return_tensors=None, add_special_tokens=add_special_tokens)["input_ids"]
            return {"input_ids": input_ids}
    
        # Pipeline模型处理
        def forward(self, model_inputs: dict,
                    **forward_params):
            """function description"""
            forward_params.pop("None", None)
            input_ids = model_inputs["input_ids"]
    
            # 输入token到网络中进行计算,并生成相应文本的token序列
            output_ids = self.network.generate(input_ids, **forward_params)
            return {"output_ids": output_ids}
    
        # Pipeline后处理
        def postprocess(self, model_outputs: dict,
                        **postprocess_params):
            """function description"""
            # 输出token序列重新解码为文本
            outputs = self.tokenizer.decode(model_outputs["output_ids"], skip_special_tokens=True)
            return [{self.return_name + '_text': outputs}]
    
  • text_generation_pipeline.py中调用了self.modelgenerate方法,Mindformers中已经在mindfirmers/models/text_generator.py实现了该方法的具体逻辑,用户可直接使用,如有特殊要求则可参考该脚本自行实现文本生成任务,以下展示部分核心逻辑:

    # text_generator.py
    # 以下展示了部分核心代码,具体实现请参考Mindformers
    class GenerationMixin:
        """class description"""
        def __init__(self):
            pass  
        ...
    
        def _forward(self, origin_inputs, top_k, top_p, repetition_penalty, max_length, eos_token_id, streamer=None, pad_token_id=None):
            """function description"""
            if pad_token_id is None:
                pad_token_id = 0
            use_pynative = True
    
            # 如传入steamer,则使用流式推理功能
            if streamer is not None:
                streamer.put(origin_inputs[0])
    
            # 获取输入数据shape与模型参数设置
            batch_size = origin_inputs.shape[0]
            is_encoder_decoder = self.config.is_encoder_decoder
            logger.debug("The input shape is: %s", origin_inputs.shape)
            valid_length_each_example = []
            for i in range(batch_size):
                # 得到输入token序列真实长度(排除special token)
                valid_length_each_example.append(np.max(np.argwhere(origin_inputs[i] != pad_token_id)) + 1)
            valid_length_each_example = np.array(valid_length_each_example)
            logger.debug("Get the valid for each example is: %s", valid_length_each_example)
            if not is_encoder_decoder and np.max(valid_length_each_example) > max_length:
                raise ValueError("The max_length set is smaller than the length in the input_ids. You shout set "
                                f"max_length to {np.max(valid_length_each_example)}")
            target_length = self.config.seq_length if max_length > self.config.seq_length else max_length
            logger.debug("max target_length is: %s", target_length)
            frequency_list = None
            # 将输入token序列pad至target_length
            input_ids = self._pad_inputs_using_max_length(origin_inputs=origin_inputs, pad_token_id=pad_token_id)
            logger.debug("pad the origin inputs from %s into shape: %s", origin_inputs.shape, input_ids.shape)
    
            # 生成input_mask
            input_mask = np.zeros_like(input_ids)
            for i in range(valid_length_each_example.shape[0]):
                input_mask[i, :valid_length_each_example[i]] = 1
            encoder_output = None
            encoder_mask = None
    
            if is_encoder_decoder:
                ...
    
            # 循环生成token直到is_finished中全部为True
            is_finished = [False] * batch_size
    
            # 若config.use_past为True则使用增量推理
            if self.config.use_past:
                self.is_first_iteration = True
            is_first_iteration = False
    
            #开始生成文本
            while np.sum(is_finished) != batch_size:
                if is_encoder_decoder:
                    ...
                else:
                    seq_length = input_ids.shape[1]
                    current_index = [valid_length_each_example[i] - 1 + i * seq_length for i in range(batch_size)]
                    logger.debug("validate length: %s", valid_length_each_example)
    
                    # 使用增量推理
                    if self.config.use_past:
                        is_first_iteration = self.is_first_iteration
    
                        # 生成推理所需的position_ids和attention_mask
                        position_ids, attention_mask = self.generate_pos_id_and_mask_for_incr_infer(
                            input_ids=input_ids,
                            current_index=current_index,
                            valid_length_each_example=valid_length_each_example
                        )
                        # 推理
                        logits = self._incremental_infer(
                            input_ids=input_ids,
                            position_ids=position_ids,
                            attention_mask=attention_mask,
                            current_index=current_index,
                            valid_length_each_example=valid_length_each_example
                        )[0]
                    # 不使用增量推理
                    else:
                        logits = self(Tensor(input_ids, mstype.int32))[0]  # pylint: disable=E1102
                    logits = logits.reshape(-1, logits.shape[-1])
                    log_probs = self.process_logits(logits, Tensor(current_index, mstype.int32),
                                                    is_first_iteration, self.config.use_past)
    
                # 对模型输出logits进行采样,生成文本token的选取概率p与索引p_args
                log_probs = log_probs.asnumpy()
                vocab_size = log_probs.shape[-1]
                if repetition_penalty != 1 and frequency_list is None:
                    frequency_list = np.array([[0 for _ in range(vocab_size)]])
                log_probs_revised = log_probs.reshape(batch_size, vocab_size)
                if repetition_penalty != 1:
                    log_probs_revised = log_probs - frequency_list * repetition_penalty - \
                                        (frequency_list > 0) * repetition_penalty
                p, p_args = sampler(log_probs_revised, top_p, top_k, use_pynative)
    
                # 根据p与p_args随机选取生成文本
                for i in range(batch_size):
                    if is_finished[i]:
                        continue
    
                    # 根据p的概率大小对p_args进行采样
                    target_index = np.random.choice(len(p[i]), p=p[i])
                    target = p_args[i][target_index]
    
                    if repetition_penalty != 1:
                        frequency_list[0][target] = frequency_list[0][target] + 1
                    input_ids[i, valid_length_each_example[i]] = p_args[i, target_index]
    
                    # 流式推理
                    if streamer is not None:
                        streamer.put(np.asarray([target]))
    
                    if is_encoder_decoder:
                        target_mask[i][valid_length_each_example[i]] = int(1)
    
                    valid_length_each_example[i] += int(1)
                    input_mask[i][valid_length_each_example[i] - 1] = 1
    
                    # 是否达到生成停止条件
                    if p_args[i][target_index] == eos_token_id or valid_length_each_example[i] == target_length:
                        is_finished[i] = True
                        continue
    
            # 文本全部生成完毕,返回所有output_ids
            output_ids = []
            for i in range(batch_size):
                output_ids.append(input_ids[i, : int(valid_length_each_example[i])].astype(np.int32))
            logger.debug("The output is: %s", output_ids)
            if streamer is not None:
                streamer.end()
            return output_ids
    

4 GPT2训练与推理实现

4.1 预训练与微调

  • 实现GPT2的训练与微调脚本,可通过Mindformersscripts/run_distribute.sh脚本拉起预训练或微调。训练逻辑目前已在mindformers/trainer/base_trainer.py实现,用户可直接使用该基类,如对训练流程有特殊需求则可修改training_process方法自定义训练流程。

    # base_trainer.py
    # 以下展示了部分核心代码,具体实现请参考Mindformers
    def training_process(
            self,
            config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
            network: Optional[Union[Cell, PreTrainedModel]] = None,
            dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
            optimizer: Optional[Optimizer] = None,
            wrapper: Optional[TrainOneStepCell] = None,
            callbacks: Optional[Union[Callback, List[Callback]]] = None,
            **kwargs):
        """训练及微调逻辑实现"""
    
        # 获取训练/微调参数设置
        self.kwargs = kwargs
        is_full_config = kwargs.get("is_full_config", False)
        config = self.set_config(config, is_full_config)
    
        # 若resume_training为True,且load_checkpoint为正确的ckpt路径,则预训练ckpt进行读取
        if config.resume_training and config.load_checkpoint:
            logger.info(".............Start load resume context from checkpoint..................")
            load_resume_context_from_checkpoint(config)
    
        # 构建数据集
        logger.info(".........Build Dataset For Train..........")
        if dataset is None:
            dataset = self.create_train_dataset()
        self.set_train_dataset(dataset)
        check_runner_config(config, dataset)
        if config.runner_config.sink_mode:
            epoch_num = math.ceil((config.runner_config.epochs - config.runner_config.initial_epoch)
                                  * config.runner_config.sink_size / dataset.get_dataset_size())
            dataset._dataset_helper = DatasetHelper(dataset, config.runner_config.sink_mode,
                                                    config.runner_config.sink_size, epoch_num)
    
        # 构建训练网络
        logger.info(".........Build Net For Train..........")
        if network is None and wrapper is None and \
                self.model_wrapper is None and self.network is None:
            if self.get_pipeline_stages() > 1:
                # 如果采用流水线并行,则采用流水线网络的构建方法
                network = self.create_pipeline_network(default_args={"parallel_config": config.parallel_config,
                                                                     "moe_config": config.moe_config})
    
            else:
                # 不采用流水线并行则采用普通构建的方法
                network = self.create_network(default_args={"parallel_config": config.parallel_config,
                                                            "moe_config": config.moe_config})
        # 如果network与wrapper都为空,则创建一个网络
        elif network is None and wrapper is None and self.network is not None:
            logger.info(".........Using The Existing Network For Train:: %s", self.network.__class__.__name__)
            network = self.network
    
        if network is not None:
            self.set_network(network, is_train=True)
    
        if wrapper is not None:
            self.set_model_wrapper(wrapper)
    
        self.count_parameters()
    
        # 构建优化器
        logger.info(".........Build Optimizer For Train..........")
        if optimizer is None and wrapper is None and self.model_wrapper is None:
            optimizer = self.create_optimizer_scheduler(network, layer_scale=config.layer_scale)
    
        # 构建模型wrapper
        if wrapper is None and self.model_wrapper is None:
            logger.info(".........Build Running Wrapper From Config For Train..........")
            wrapper = self.create_model_wrapper(network, optimizer)
        elif wrapper is None and self.model_wrapper is not None:
            logger.info(".........Using The Existing Model Wrapper: %s", self.model_wrapper.__class__.__name__)
            wrapper = self.model_wrapper
    
        # 构建模型callback
        logger.info(".........Build Callbacks For Train..........")
        if callbacks is None:
            callbacks = self.create_callbacks(default_args={
                "learning_rate": optimizer.learning_rate if optimizer else wrapper.optimizer.learning_rate,
                "origin_epochs": config.runner_config.origin_epochs,
                "dataset_size": config.data_size,
                "micro_batch_interleave_num": config.micro_batch_interleave_num,
                "micro_batch_num": config.parallel_config.micro_batch_num,
                "initial_epoch": config.runner_config.initial_epoch})
    
        # 定义在训练中评估的计算指标
        compute_metrics = None
        if config.do_eval:
            compute_metrics = self.create_metrics()
    
        # 利用Model高阶接口定义具体模型
        logger.info(".........Starting Init Train Model..........")
        if wrapper is not None:
            model = Model(wrapper, metrics=compute_metrics, eval_network=network)
        else:
            model = Model(network, optimizer=optimizer, metrics=compute_metrics, eval_network=network)
    
        # 训练恢复/微调流程
        if config.load_checkpoint or config.only_save_strategy:
            if config.resume_training:
                logger.info(".............Start resume training from checkpoint..................")
                transform_and_load_checkpoint(config, model, network, dataset, optimizer=optimizer)
            else:
                if config.load_checkpoint in SUPPORT_MODEL_NAMES:
                    config.load_checkpoint = \
                        AutoModel.from_pretrained(config.load_checkpoint).default_checkpoint_download_path
                transform_and_load_checkpoint(config, model, network, dataset)
    
        # 边训练边评估特性
        if config.do_eval:
            logger.info(".........Build Evaluate in Training Callback..........")
            eval_dataset = kwargs.get('eval_dataset', None)
            if eval_dataset is None:
                eval_dataset = self.create_eval_dataset()
    
            eval_callback = EvalCallBack(
                partial(
                    self._evaluate_in_training,
                    model=model,
                    eval_dataset=eval_dataset,
                ),
                step_interval=config.eval_step_interval if config.eval_step_interval else -1,
                epoch_interval=config.eval_epoch_interval if config.eval_epoch_interval else 1,
            )
            callbacks.append(eval_callback)
    
        # 模型训练
        logger.info(".........Starting Training Model..........")
        logger.info(".........Model Compiling, Please Wait a Moment...........")
        model.train(config.runner_config.epochs, dataset,
                    callbacks=callbacks,
                    dataset_sink_mode=config.runner_config.sink_mode,
                    sink_size=config.runner_config.sink_size,
                    initial_epoch=config.runner_config.initial_epoch)
        logger.info(".........Training Over!.............")
    

4.2 脚本启动

  • 完成训练逻辑的编写后,下一步可通过配置预训练或微调所需要的config文件,利用Mindformers准备好的训练脚本拉起任务,以configs/gpt2/run_gpt2_13b.yaml为例:

    # run_gpt2_13b.yaml
    # 以下展示了部分配置,具体配置内容请参考Mindformers
    seed: 0
    run_mode: 'train'
    output_dir: './output' # path to save checkpoint/strategy
    load_checkpoint: ""
    src_strategy_path_or_dir: ''
    auto_trans_ckpt: False  # If true, auto transform load_checkpoint to load in distributed model
    only_save_strategy: False
    resume_training: False
    
    # context
    context:
    mode: 0 #0--Graph Mode; 1--Pynative Mode
    device_target: "Ascend"
    enable_graph_kernel: False
    graph_kernel_flags: "--disable_expand_ops=Softmax,Dropout --enable_parallel_fusion=true --reduce_fuse_depth=8 --enable_auto_tensor_inplace=true"
    max_call_depth: 10000
    max_device_memory: "30GB"
    save_graphs: False
    save_graphs_path: "./graph"
    device_id: 0
    
    ...
    
  • 然后以单机八卡为例,配置好训练config后,拉起训练任务还需要准备RANK_TABLE_FILE的json文件,运行mindformers/tools/hccl_tools.py可生成rank_table

    python ./mindformers/tools/hccl_tools.py --device_num "[0,8)"
    
  • 最后可通过scripts/run_distribute.sh拉起分布式训练。RANK_TABLE_FILErank_table的路径;当RUN_STATUStrain时启动预训练流程,当输入finetune时为微调流程。

    cd scripts
    bash run_distribute.sh {RANK_TABLE_FILE} ../configs/gpt2/run_gpt2_13b.yaml [0,8) {RUN_STATUS}