import os import torch import stat import re from functools import partial from typing import List, Tuple from SwissArmyTransformer import mpu from evaluation.model import batch_filling_sequence from generation import BeamSearchStrategy, BaseStrategy from SwissArmyTransformer.generation.utils import timed_name from initialize import initialize, initialize_model_and_tokenizer import torch.distributed as dist import time import gradio as gr def add_generation_specific_args(parser): parser.add_argument("--sampling-strategy", type=str, default="BaseStrategy", help="Type of sampling strategy.") parser.add_argument("--min-gen-length", type=int, default=0, help="The minimum length each blank should generate.") parser.add_argument( "--print-all-beams", action="store_true", help="Print all output generated by beam search strategy." ) def isEnglish(s): try: s.encode(encoding="utf-8").decode("ascii") except UnicodeDecodeError: return False else: return True def get_masks_and_position_ids(seq, mask_position, max_gen_length, gmask=False): context_length = seq.shape[1] tokens = torch.nn.functional.pad(seq, (0, max_gen_length), mode="constant", value=-1) attention_mask = torch.ones((1, tokens.shape[-1], tokens.shape[-1]), device=tokens.device) attention_mask.tril_() attention_mask[..., : context_length - 1] = 1 attention_mask.unsqueeze_(1) attention_mask = (attention_mask < 0.5).bool() position_ids = torch.arange(tokens.shape[-1], dtype=torch.long, device=tokens.device) if not gmask: position_ids[context_length - 1 :] = mask_position position_ids = position_ids.unsqueeze(0) return tokens, attention_mask, position_ids def fill_blanks(raw_text: str, model, tokenizer, strategy) -> Tuple[List[str], List[str], List[List[str]]]: # add MASK generation_mask = "[gMASK]" if "[MASK]" in raw_text: generation_mask = "[MASK]" elif "[sMASK]" in raw_text: generation_mask = "[sMASK]" use_gmask = "[MASK]" not in raw_text and "[sMASK]" not in raw_text mask_pattern = r"\[[sg]?MASK\]" text_list = re.split(mask_pattern, raw_text) pattern_list = re.compile(mask_pattern).findall(raw_text) seq = [] for i in range(len(pattern_list)): pattern = pattern_list[i] sub_text = text_list[i] seq.extend(tokenizer.tokenize(sub_text)) seq.append(tokenizer.get_command(pattern)) seq.extend(tokenizer.tokenize(text_list[-1])) if "MASK]" not in raw_text: seq += [tokenizer.get_command(generation_mask)] raw_text += " " + generation_mask if not raw_text.endswith("MASK]"): seq = seq + [tokenizer.get_command("eos")] if mpu.get_model_parallel_rank() == 0: print("\nInput: {}\n".format(raw_text)) if len(seq) > args.max_sequence_length: raise ValueError("text too long.") # generation is_english = isEnglish(raw_text) output_list = [seq] num_output = args.num_beams if args.sampling_strategy == "BeamSearchStrategy" else 1 last_pos, answers, answers_with_style, blanks = ( [0] * num_output, ["" for _ in range(num_output)], ["" for _ in range(num_output)], [[] for _ in range(num_output)], ) # continually detect the first mark position while True: seq = output_list[0] # detect mask position mask_token = tokenizer.get_command(generation_mask) if mask_token not in seq: break mask_position = seq.index(mask_token) output_list = [] input_seq = torch.cuda.LongTensor( [seq + [tokenizer.get_command("sop")]], device=args.device, ) output, _ = batch_filling_sequence( model, input_seq, torch.cuda.LongTensor([input_seq.shape[-1]], device=args.device), strategy=strategy, get_masks_and_position_ids=partial( get_masks_and_position_ids, mask_position=mask_position, max_gen_length=args.out_seq_length, gmask=use_gmask, ), ) if isinstance(output, torch.Tensor): # different strategies output = output.tolist() output = output[0] # batch_size = 1 output_list.extend(output) # clip -1s and fill back generated things into seq for i in range(len(output_list)): output = output_list[i].tolist() if isinstance(output_list[i], torch.Tensor) else output_list[i] try: unfinished = output.index(-1) except ValueError: unfinished = len(output) if output[unfinished - 1] in strategy.end_tokens: unfinished -= 1 bog = output.index(tokenizer.get_command("sop")) prefix = tokenizer.detokenize(output[last_pos[i] : mask_position]) blank = tokenizer.detokenize(output[bog + 1 : unfinished]) answers_with_style[i] += ( prefix + (" " if is_english else "") + ("\033[4m" if use_gmask else "\x1b[0;32m\033[4m") + blank + ("\033[0m" if use_gmask else "\033[0m\x1b[0m") + (" " if is_english else "") ) blanks[i].append(blank) last_pos[i] = mask_position + unfinished - (bog + 1) output_list[i] = output[:mask_position] + output[bog + 1 : unfinished] + output[mask_position + 1 : bog] for i, output in enumerate(output_list): if output[-1] == tokenizer.get_command("eos"): output = output[:-1] answers_with_style[i] += tokenizer.detokenize(output[last_pos[i] :]) answers[i] = tokenizer.detokenize(output) return answers, answers_with_style, blanks def generate_continually(func, raw_text): if not raw_text: return "Input should not be empty!" try: start_time = time.time() answer = func(raw_text) if torch.distributed.get_rank() == 0: print("\nTaken time {:.2f}\n".format(time.time() - start_time), flush=True) return answer except (ValueError, FileNotFoundError) as e: print(e) return "Error!" strategy = None def main(args): model, tokenizer = initialize_model_and_tokenizer(args) end_tokens = [tokenizer.get_command("eop"), tokenizer.get_command("eos")] def process(raw_text): global strategy if args.with_id: query_id, raw_text = raw_text.split("\t") answers, answers_with_style, blanks = fill_blanks(raw_text, model, tokenizer, strategy) if torch.distributed.get_rank() == 0: print(answers) return answers[0] def predict( text, seed=1234, out_seq_length=200, min_gen_length=20, sampling_strategy="BaseStrategy", num_beams=4, length_penalty=0.9, no_repeat_ngram_size=3, temperature=1, topk=1, topp=1, ): global strategy if torch.distributed.get_rank() == 0: print( "info", [ text, seed, out_seq_length, min_gen_length, sampling_strategy, num_beams, length_penalty, no_repeat_ngram_size, temperature, topk, topp, ], ) dist.broadcast_object_list( [ text, seed, out_seq_length, min_gen_length, sampling_strategy, num_beams, length_penalty, no_repeat_ngram_size, temperature, topk, topp, ], src=0, ) args.seed = seed args.out_seq_length = out_seq_length args.min_gen_length = min_gen_length args.sampling_strategy = sampling_strategy args.num_beams = num_beams args.length_penalty = length_penalty args.no_repeat_ngram_size = no_repeat_ngram_size args.temperature = temperature args.top_k = topk args.top_p = topp if args.sampling_strategy == "BaseStrategy": strategy = BaseStrategy( batch_size=1, temperature=args.temperature, top_k=args.top_k, top_p=args.top_p, end_tokens=end_tokens ) elif args.sampling_strategy == "BeamSearchStrategy": strategy = BeamSearchStrategy( batch_size=1, num_beams=args.num_beams, length_penalty=args.length_penalty, consider_end=True, end_tokens=end_tokens, no_repeat_ngram_size=args.no_repeat_ngram_size, min_gen_length=args.min_gen_length, ) else: raise ValueError(f"unknown strategy {args.sampling_strategy}") return generate_continually(process, text) if torch.distributed.get_rank() == 0: en_fil = ["The Starry Night is an oil-on-canvas painting by [MASK] in June 1889."] en_gen = ["Eight planets in solar system are [gMASK]"] ch_fil = ["凯旋门位于意大利米兰市古城堡旁。1807年为纪念[MASK]而建,门高25米,顶上矗立两武士青铜古兵车铸像。"] ch_gen = ["三亚位于海南岛的最南端,是中国最南部的热带滨海旅游城市 [gMASK]"] en_to_ch = ["Pencil in Chinese is [MASK]."] ch_to_en = ['"我思故我在"的英文是"[MASK]"。'] examples = [en_fil, en_gen, ch_fil, ch_gen, en_to_ch, ch_to_en] with gr.Blocks() as demo: gr.Markdown( """ An Open Bilingual Pre-Trained Model. [Visit our github repo](https://github.com/THUDM/GLM-130B) GLM-130B uses two different mask tokens: `[MASK]` for short blank filling and `[gMASK]` for left-to-right long text generation. When the input does not contain any MASK token, `[gMASK]` will be automatically appended to the end of the text. We recommend that you use `[MASK]` to try text fill-in-the-blank to reduce wait time (ideally within seconds without queuing). Note: We suspect that there is a bug in the current FasterTransformer INT4 implementation that leads to gaps in generations compared to the FP16 model (e.g. more repititions), which we are troubleshooting, and the current model output is **for reference only** """ ) with gr.Row(): with gr.Column(): model_input = gr.Textbox( lines=7, placeholder="Input something in English or Chinese", label="Input" ) with gr.Row(): gen = gr.Button("Generate") clr = gr.Button("Clear") outputs = gr.Textbox(lines=7, label="Output") gr.Markdown( """ Generation Parameter """ ) with gr.Row(): with gr.Column(): seed = gr.Slider(maximum=100000, value=1234, step=1, label="Seed") out_seq_length = gr.Slider( maximum=512, value=128, minimum=32, step=1, label="Output Sequence Length" ) with gr.Column(): min_gen_length = gr.Slider(maximum=64, value=0, step=1, label="Min Generate Length") sampling_strategy = gr.Radio( choices=["BeamSearchStrategy", "BaseStrategy"], value="BaseStrategy", label="Search Strategy" ) with gr.Row(): with gr.Column(): # beam search gr.Markdown( """ BeamSearchStrategy """ ) num_beams = gr.Slider(maximum=4, value=2, minimum=1, step=1, label="Number of Beams") length_penalty = gr.Slider(maximum=1, value=1, minimum=0, label="Length Penalty") no_repeat_ngram_size = gr.Slider( maximum=5, value=3, minimum=1, step=1, label="No Repeat Ngram Size" ) with gr.Column(): # base search gr.Markdown( """ BaseStrategy """ ) temperature = gr.Slider(maximum=1, value=0.7, minimum=0, label="Temperature") topk = gr.Slider(maximum=40, value=0, minimum=0, step=1, label="Top K") topp = gr.Slider(maximum=1, value=0.7, minimum=0, label="Top P") inputs = [ model_input, seed, out_seq_length, min_gen_length, sampling_strategy, num_beams, length_penalty, no_repeat_ngram_size, temperature, topk, topp, ] gen.click(fn=predict, inputs=inputs, outputs=outputs) clr.click(fn=lambda value: gr.update(value=""), inputs=clr, outputs=model_input) gr_examples = gr.Examples(examples=examples, inputs=model_input) gr.Markdown( """ Disclaimer inspired from [BLOOM](https://huggingface.co/spaces/bigscience/bloom-book) GLM-130B was trained on web-crawled data, so it's hard to predict how GLM-130B will respond to particular prompts; harmful or otherwise offensive content may occur without warning. We prohibit users from knowingly generating or allowing others to knowingly generate harmful content, including Hateful, Harassment, Violence, Adult, Political, Deception, etc. """ ) demo.launch(share=True) else: while True: info = [None, None, None, None, None, None, None, None, None, None, None] dist.broadcast_object_list(info, src=0) ( text, seed, out_seq_length, min_gen_length, sampling_strategy, num_beams, length_penalty, no_repeat_ngram_size, temperature, topk, topp, ) = info predict( text, seed, out_seq_length, min_gen_length, sampling_strategy, num_beams, length_penalty, no_repeat_ngram_size, temperature, topk, topp, ) if __name__ == "__main__": args = initialize(extra_args_provider=add_generation_specific_args) with torch.no_grad(): main(args)