app.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. #!/usr/bin/env python
  2. # Copyright (c) Meta Platforms, Inc. and affiliates
  3. # All rights reserved.
  4. #
  5. # This source code is licensed under the license found in the
  6. # MIT_LICENSE file in the root directory of this source tree.
  7. import os
  8. import pathlib
  9. import tempfile
  10. import gradio as gr
  11. import torch
  12. import torchaudio
  13. from fairseq2.assets import InProcAssetMetadataProvider, asset_store
  14. from fairseq2.data import Collater, SequenceData, VocabularyInfo
  15. from fairseq2.data.audio import (
  16. AudioDecoder,
  17. WaveformToFbankConverter,
  18. WaveformToFbankOutput,
  19. )
  20. from seamless_communication.inference import SequenceGeneratorOptions
  21. from fairseq2.generation import NGramRepeatBlockProcessor
  22. from fairseq2.memory import MemoryBlock
  23. from fairseq2.typing import DataType, Device
  24. from huggingface_hub import snapshot_download
  25. from seamless_communication.inference import BatchedSpeechOutput, Translator, SequenceGeneratorOptions
  26. from seamless_communication.models.generator.loader import load_pretssel_vocoder_model
  27. from seamless_communication.models.unity import (
  28. UnitTokenizer,
  29. load_gcmvn_stats,
  30. load_unity_text_tokenizer,
  31. load_unity_unit_tokenizer,
  32. )
  33. from torch.nn import Module
  34. from seamless_communication.cli.expressivity.evaluate.pretssel_inference_helper import PretsselGenerator
  35. from utils import LANGUAGE_CODE_TO_NAME
  36. DESCRIPTION = """\
  37. # Seamless Expressive
  38. [SeamlessExpressive](https://github.com/facebookresearch/seamless_communication) is a speech-to-speech translation model that captures certain underexplored aspects of prosody such as speech rate and pauses, while preserving the style of one's voice and high content translation quality.
  39. """
  40. CACHE_EXAMPLES = os.getenv("CACHE_EXAMPLES") == "1" and torch.cuda.is_available()
  41. CHECKPOINTS_PATH = pathlib.Path(os.getenv("CHECKPOINTS_PATH", "/home/user/app/models"))
  42. if not CHECKPOINTS_PATH.exists():
  43. snapshot_download(repo_id="facebook/seamless-expressive", repo_type="model", local_dir=CHECKPOINTS_PATH)
  44. snapshot_download(repo_id="facebook/seamless-m4t-v2-large", repo_type="model", local_dir=CHECKPOINTS_PATH)
  45. # Ensure that we do not have any other environment resolvers and always return
  46. # "demo" for demo purposes.
  47. asset_store.env_resolvers.clear()
  48. asset_store.env_resolvers.append(lambda: "demo")
  49. # Construct an `InProcAssetMetadataProvider` with environment-specific metadata
  50. # that just overrides the regular metadata for "demo" environment. Note the "@demo" suffix.
  51. demo_metadata = [
  52. {
  53. "name": "seamless_expressivity@demo",
  54. "checkpoint": f"file://{CHECKPOINTS_PATH}/m2m_expressive_unity.pt",
  55. "char_tokenizer": f"file://{CHECKPOINTS_PATH}/spm_char_lang38_tc.model",
  56. },
  57. {
  58. "name": "vocoder_pretssel@demo",
  59. "checkpoint": f"file://{CHECKPOINTS_PATH}/pretssel_melhifigan_wm-final.pt",
  60. },
  61. {
  62. "name": "seamlessM4T_v2_large@demo",
  63. "checkpoint": f"file://{CHECKPOINTS_PATH}/seamlessM4T_v2_large.pt",
  64. "char_tokenizer": f"file://{CHECKPOINTS_PATH}/spm_char_lang38_tc.model",
  65. },
  66. ]
  67. asset_store.metadata_providers.append(InProcAssetMetadataProvider(demo_metadata))
  68. LANGUAGE_NAME_TO_CODE = {v: k for k, v in LANGUAGE_CODE_TO_NAME.items()}
  69. if torch.cuda.is_available():
  70. device = torch.device("cuda:0")
  71. dtype = torch.float16
  72. else:
  73. device = torch.device("cpu")
  74. dtype = torch.float32
  75. MODEL_NAME = "seamless_expressivity"
  76. VOCODER_NAME = "vocoder_pretssel"
  77. # used for ASR for toxicity
  78. m4t_translator = Translator(
  79. model_name_or_card="seamlessM4T_v2_large",
  80. vocoder_name_or_card=None,
  81. device=device,
  82. dtype=dtype,
  83. )
  84. unit_tokenizer = load_unity_unit_tokenizer(MODEL_NAME)
  85. _gcmvn_mean, _gcmvn_std = load_gcmvn_stats(VOCODER_NAME)
  86. gcmvn_mean = torch.tensor(_gcmvn_mean, device=device, dtype=dtype)
  87. gcmvn_std = torch.tensor(_gcmvn_std, device=device, dtype=dtype)
  88. translator = Translator(
  89. MODEL_NAME,
  90. vocoder_name_or_card=None,
  91. device=device,
  92. dtype=dtype,
  93. apply_mintox=False,
  94. )
  95. text_generation_opts = SequenceGeneratorOptions(
  96. beam_size=5,
  97. unk_penalty=torch.inf,
  98. soft_max_seq_len=(0, 200),
  99. step_processor=NGramRepeatBlockProcessor(
  100. ngram_size=10,
  101. ),
  102. )
  103. m4t_text_generation_opts = SequenceGeneratorOptions(
  104. beam_size=5,
  105. unk_penalty=torch.inf,
  106. soft_max_seq_len=(1, 200),
  107. step_processor=NGramRepeatBlockProcessor(
  108. ngram_size=10,
  109. ),
  110. )
  111. pretssel_generator = PretsselGenerator(
  112. VOCODER_NAME,
  113. vocab_info=unit_tokenizer.vocab_info,
  114. device=device,
  115. dtype=dtype,
  116. )
  117. decode_audio = AudioDecoder(dtype=torch.float32, device=device)
  118. convert_to_fbank = WaveformToFbankConverter(
  119. num_mel_bins=80,
  120. waveform_scale=2**15,
  121. channel_last=True,
  122. standardize=False,
  123. device=device,
  124. dtype=dtype,
  125. )
  126. def normalize_fbank(data: WaveformToFbankOutput) -> WaveformToFbankOutput:
  127. fbank = data["fbank"]
  128. std, mean = torch.std_mean(fbank, dim=0)
  129. data["fbank"] = fbank.subtract(mean).divide(std)
  130. data["gcmvn_fbank"] = fbank.subtract(gcmvn_mean).divide(gcmvn_std)
  131. return data
  132. collate = Collater(pad_value=0, pad_to_multiple=1)
  133. AUDIO_SAMPLE_RATE = 16000
  134. MAX_INPUT_AUDIO_LENGTH = 10 # in seconds
  135. def remove_prosody_tokens_from_text(text):
  136. # filter out prosody tokens, there is only emphasis '*', and pause '='
  137. text = text.replace("*", "").replace("=", "")
  138. text = " ".join(text.split())
  139. return text
  140. def preprocess_audio(input_audio_path: str) -> None:
  141. arr, org_sr = torchaudio.load(input_audio_path)
  142. new_arr = torchaudio.functional.resample(arr, orig_freq=org_sr, new_freq=AUDIO_SAMPLE_RATE)
  143. max_length = int(MAX_INPUT_AUDIO_LENGTH * AUDIO_SAMPLE_RATE)
  144. if new_arr.shape[1] > max_length:
  145. new_arr = new_arr[:, :max_length]
  146. gr.Warning(f"Input audio is too long. Only the first {MAX_INPUT_AUDIO_LENGTH} seconds is used.")
  147. torchaudio.save(input_audio_path, new_arr, sample_rate=AUDIO_SAMPLE_RATE)
  148. def run(
  149. input_audio_path: str,
  150. source_language: str,
  151. target_language: str,
  152. ) -> tuple[str, str]:
  153. target_language_code = LANGUAGE_NAME_TO_CODE[target_language]
  154. source_language_code = LANGUAGE_NAME_TO_CODE[source_language]
  155. preprocess_audio(input_audio_path)
  156. with pathlib.Path(input_audio_path).open("rb") as fb:
  157. block = MemoryBlock(fb.read())
  158. example = decode_audio(block)
  159. example = convert_to_fbank(example)
  160. example = normalize_fbank(example)
  161. example = collate(example)
  162. # get transcription for mintox
  163. source_sentences, _ = m4t_translator.predict(
  164. input=example["fbank"],
  165. task_str="S2TT", # get source text
  166. tgt_lang=source_language_code,
  167. text_generation_opts=m4t_text_generation_opts,
  168. )
  169. source_text = str(source_sentences[0])
  170. prosody_encoder_input = example["gcmvn_fbank"]
  171. text_output, unit_output = translator.predict(
  172. example["fbank"],
  173. "S2ST",
  174. tgt_lang=target_language_code,
  175. src_lang=source_language_code,
  176. text_generation_opts=text_generation_opts,
  177. unit_generation_ngram_filtering=False,
  178. duration_factor=1.0,
  179. prosody_encoder_input=prosody_encoder_input,
  180. src_text=source_text, # for mintox check
  181. )
  182. speech_output = pretssel_generator.predict(
  183. unit_output.units,
  184. tgt_lang=target_language_code,
  185. prosody_encoder_input=prosody_encoder_input,
  186. )
  187. with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
  188. torchaudio.save(
  189. f.name,
  190. speech_output.audio_wavs[0][0].to(torch.float32).cpu(),
  191. sample_rate=speech_output.sample_rate,
  192. )
  193. text_out = remove_prosody_tokens_from_text(str(text_output[0]))
  194. return f.name, text_out
  195. TARGET_LANGUAGE_NAMES = [
  196. "English",
  197. "French",
  198. "German",
  199. "Spanish",
  200. ]
  201. with gr.Blocks(css="style.css") as demo:
  202. gr.Markdown(DESCRIPTION)
  203. gr.DuplicateButton(
  204. value="Duplicate Space for private use",
  205. elem_id="duplicate-button",
  206. visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1",
  207. )
  208. with gr.Row():
  209. with gr.Column():
  210. with gr.Group():
  211. input_audio = gr.Audio(label="Input speech", type="filepath")
  212. source_language = gr.Dropdown(
  213. label="Source language",
  214. choices=TARGET_LANGUAGE_NAMES,
  215. value="English",
  216. )
  217. target_language = gr.Dropdown(
  218. label="Target language",
  219. choices=TARGET_LANGUAGE_NAMES,
  220. value="French",
  221. )
  222. btn = gr.Button()
  223. with gr.Column():
  224. with gr.Group():
  225. output_audio = gr.Audio(label="Translated speech")
  226. output_text = gr.Textbox(label="Translated text")
  227. gr.Examples(
  228. examples=[],
  229. inputs=[input_audio, source_language, target_language],
  230. outputs=[output_audio, output_text],
  231. fn=run,
  232. cache_examples=CACHE_EXAMPLES,
  233. api_name=False,
  234. )
  235. btn.click(
  236. fn=run,
  237. inputs=[input_audio, source_language, target_language],
  238. outputs=[output_audio, output_text],
  239. api_name="run",
  240. )
  241. if __name__ == "__main__":
  242. demo.queue(max_size=50).launch()