Shortcuts

Source code for torchaudio.models.decoder._ctc_decoder

from __future__ import annotations

import itertools as it
from abc import abstractmethod
from collections import namedtuple
from typing import Dict, List, NamedTuple, Optional, Tuple, Union

import torch
import torchaudio
from torchaudio.utils import download_asset

try:
    # We prioritize the version from upstream flashlight here.
    # This will allow applications that use the upstream flashlight
    # alongside torchaudio.
    from flashlight.lib.text.decoder import (
        CriterionType as _CriterionType,
        KenLM as _KenLM,
        LexiconDecoder as _LexiconDecoder,
        LexiconDecoderOptions as _LexiconDecoderOptions,
        LexiconFreeDecoder as _LexiconFreeDecoder,
        LexiconFreeDecoderOptions as _LexiconFreeDecoderOptions,
        LM as _LM,
        LMState as _LMState,
        SmearingMode as _SmearingMode,
        Trie as _Trie,
        ZeroLM as _ZeroLM,
    )
    from flashlight.lib.text.dictionary import (
        create_word_dict as _create_word_dict,
        Dictionary as _Dictionary,
        load_words as _load_words,
    )
except Exception:
    torchaudio._extension._load_lib("libflashlight-text")
    from torchaudio.flashlight_lib_text_decoder import (
        CriterionType as _CriterionType,
        KenLM as _KenLM,
        LexiconDecoder as _LexiconDecoder,
        LexiconDecoderOptions as _LexiconDecoderOptions,
        LexiconFreeDecoder as _LexiconFreeDecoder,
        LexiconFreeDecoderOptions as _LexiconFreeDecoderOptions,
        LM as _LM,
        LMState as _LMState,
        SmearingMode as _SmearingMode,
        Trie as _Trie,
        ZeroLM as _ZeroLM,
    )
    from torchaudio.flashlight_lib_text_dictionary import (
        create_word_dict as _create_word_dict,
        Dictionary as _Dictionary,
        load_words as _load_words,
    )


__all__ = [
    "CTCHypothesis",
    "CTCDecoder",
    "CTCDecoderLM",
    "CTCDecoderLMState",
    "ctc_decoder",
    "download_pretrained_files",
]

_PretrainedFiles = namedtuple("PretrainedFiles", ["lexicon", "tokens", "lm"])


def _construct_trie(tokens_dict, word_dict, lexicon, lm, silence):
    vocab_size = tokens_dict.index_size()
    trie = _Trie(vocab_size, silence)
    start_state = lm.start(False)

    for word, spellings in lexicon.items():
        word_idx = word_dict.get_index(word)
        _, score = lm.score(start_state, word_idx)
        for spelling in spellings:
            spelling_idx = [tokens_dict.get_index(token) for token in spelling]
            trie.insert(spelling_idx, word_idx, score)
    trie.smear(_SmearingMode.MAX)
    return trie


def _get_word_dict(lexicon, lm, lm_dict, tokens_dict, unk_word):
    word_dict = None
    if lm_dict is not None:
        word_dict = _Dictionary(lm_dict)

    if lexicon and word_dict is None:
        word_dict = _create_word_dict(lexicon)
    elif not lexicon and word_dict is None and type(lm) == str:
        d = {tokens_dict.get_entry(i): [[tokens_dict.get_entry(i)]] for i in range(tokens_dict.index_size())}
        d[unk_word] = [[unk_word]]
        word_dict = _create_word_dict(d)

    return word_dict


[docs]class CTCHypothesis(NamedTuple): r"""Represents hypothesis generated by CTC beam search decoder :class:`CTCDecoder`.""" tokens: torch.LongTensor """Predicted sequence of token IDs. Shape `(L, )`, where `L` is the length of the output sequence""" words: List[str] """List of predicted words. Note: This attribute is only applicable if a lexicon is provided to the decoder. If decoding without a lexicon, it will be blank. Please refer to :attr:`tokens` and :func:`~torchaudio.models.decoder.CTCDecoder.idxs_to_tokens` instead. """ score: float """Score corresponding to hypothesis""" timesteps: torch.IntTensor """Timesteps corresponding to the tokens. Shape `(L, )`, where `L` is the length of the output sequence"""
[docs]class CTCDecoderLMState(_LMState): """Language model state.""" @property def children(self) -> Dict[int, CTCDecoderLMState]: """Map of indices to LM states""" return super().children
[docs] def child(self, usr_index: int) -> CTCDecoderLMState: """Returns child corresponding to usr_index, or creates and returns a new state if input index is not found. Args: usr_index (int): index corresponding to child state Returns: CTCDecoderLMState: child state corresponding to usr_index """ return super().child(usr_index)
[docs] def compare(self, state: CTCDecoderLMState) -> CTCDecoderLMState: """Compare two language model states. Args: state (CTCDecoderLMState): LM state to compare against Returns: int: 0 if the states are the same, -1 if self is less, +1 if self is greater. """ pass
[docs]class CTCDecoderLM(_LM): """Language model base class for creating custom language models to use with the decoder."""
[docs] @abstractmethod def start(self, start_with_nothing: bool) -> CTCDecoderLMState: """Initialize or reset the language model. Args: start_with_nothing (bool): whether or not to start sentence with sil token. Returns: CTCDecoderLMState: starting state """ raise NotImplementedError
[docs] @abstractmethod def score(self, state: CTCDecoderLMState, usr_token_idx: int) -> Tuple[CTCDecoderLMState, float]: """Evaluate the language model based on the current LM state and new word. Args: state (CTCDecoderLMState): current LM state usr_token_idx (int): index of the word Returns: (CTCDecoderLMState, float) CTCDecoderLMState: new LM state float: score """ raise NotImplementedError
[docs] @abstractmethod def finish(self, state: CTCDecoderLMState) -> Tuple[CTCDecoderLMState, float]: """Evaluate end for language model based on current LM state. Args: state (CTCDecoderLMState): current LM state Returns: (CTCDecoderLMState, float) CTCDecoderLMState: new LM state float: score """ raise NotImplementedError
[docs]class CTCDecoder: """CTC beam search decoder from *Flashlight* :cite:`kahn2022flashlight`. .. devices:: CPU Note: To build the decoder, please use the factory function :func:`ctc_decoder`. """ def __init__( self, nbest: int, lexicon: Optional[Dict], word_dict: _Dictionary, tokens_dict: _Dictionary, lm: CTCDecoderLM, decoder_options: Union[_LexiconDecoderOptions, _LexiconFreeDecoderOptions], blank_token: str, sil_token: str, unk_word: str, ) -> None: """ Args: nbest (int): number of best decodings to return lexicon (Dict or None): lexicon mapping of words to spellings, or None for lexicon-free decoder word_dict (_Dictionary): dictionary of words tokens_dict (_Dictionary): dictionary of tokens lm (CTCDecoderLM): language model. If using a lexicon, only word level LMs are currently supported decoder_options (_LexiconDecoderOptions or _LexiconFreeDecoderOptions): parameters used for beam search decoding blank_token (str): token corresopnding to blank sil_token (str): token corresponding to silence unk_word (str): word corresponding to unknown """ self.nbest = nbest self.word_dict = word_dict self.tokens_dict = tokens_dict self.blank = self.tokens_dict.get_index(blank_token) silence = self.tokens_dict.get_index(sil_token) transitions = [] if lexicon: trie = _construct_trie(tokens_dict, word_dict, lexicon, lm, silence) unk_word = word_dict.get_index(unk_word) token_lm = False # use word level LM self.decoder = _LexiconDecoder( decoder_options, trie, lm, silence, self.blank, unk_word, transitions, token_lm, ) else: self.decoder = _LexiconFreeDecoder(decoder_options, lm, silence, self.blank, transitions) def _get_tokens(self, idxs: torch.IntTensor) -> torch.LongTensor: idxs = (g[0] for g in it.groupby(idxs)) idxs = filter(lambda x: x != self.blank, idxs) return torch.LongTensor(list(idxs)) def _get_timesteps(self, idxs: torch.IntTensor) -> torch.IntTensor: """Returns frame numbers corresponding to non-blank tokens.""" timesteps = [] for i, idx in enumerate(idxs): if idx == self.blank: continue if i == 0 or idx != idxs[i - 1]: timesteps.append(i) return torch.IntTensor(timesteps)
[docs] def __call__( self, emissions: torch.FloatTensor, lengths: Optional[torch.Tensor] = None ) -> List[List[CTCHypothesis]]: """ Args: emissions (torch.FloatTensor): CPU tensor of shape `(batch, frame, num_tokens)` storing sequences of probability distribution over labels; output of acoustic model. lengths (Tensor or None, optional): CPU tensor of shape `(batch, )` storing the valid length of in time axis of the output Tensor in each batch. Returns: List[List[CTCHypothesis]]: List of sorted best hypotheses for each audio sequence in the batch. """ if emissions.dtype != torch.float32: raise ValueError("emissions must be float32.") if emissions.is_cuda: raise RuntimeError("emissions must be a CPU tensor.") if lengths is not None and lengths.is_cuda: raise RuntimeError("lengths must be a CPU tensor.") B, T, N = emissions.size() if lengths is None: lengths = torch.full((B,), T) float_bytes = 4 hypos = [] for b in range(B): emissions_ptr = emissions.data_ptr() + float_bytes * b * emissions.stride(0) results = self.decoder.decode(emissions_ptr, lengths[b], N) nbest_results = results[: self.nbest] hypos.append( [ CTCHypothesis( tokens=self._get_tokens(result.tokens), words=[self.word_dict.get_entry(x) for x in result.words if x >= 0], score=result.score, timesteps=self._get_timesteps(result.tokens), ) for result in nbest_results ] ) return hypos
[docs] def idxs_to_tokens(self, idxs: torch.LongTensor) -> List: """ Map raw token IDs into corresponding tokens Args: idxs (LongTensor): raw token IDs generated from decoder Returns: List: tokens corresponding to the input IDs """ return [self.tokens_dict.get_entry(idx.item()) for idx in idxs]
[docs]def ctc_decoder( lexicon: Optional[str], tokens: Union[str, List[str]], lm: Union[str, CTCDecoderLM] = None, lm_dict: Optional[str] = None, nbest: int = 1, beam_size: int = 50, beam_size_token: Optional[int] = None, beam_threshold: float = 50, lm_weight: float = 2, word_score: float = 0, unk_score: float = float("-inf"), sil_score: float = 0, log_add: bool = False, blank_token: str = "-", sil_token: str = "|", unk_word: str = "<unk>", ) -> CTCDecoder: """Builds an instance of :class:`CTCDecoder`. Args: lexicon (str or None): lexicon file containing the possible words and corresponding spellings. Each line consists of a word and its space separated spelling. If `None`, uses lexicon-free decoding. tokens (str or List[str]): file or list containing valid tokens. If using a file, the expected format is for tokens mapping to the same index to be on the same line lm (str, CTCDecoderLM, or None, optional): either a path containing KenLM language model, custom language model of type `CTCDecoderLM`, or `None` if not using a language model lm_dict (str or None, optional): file consisting of the dictionary used for the LM, with a word per line sorted by LM index. If decoding with a lexicon, entries in lm_dict must also occur in the lexicon file. If `None`, dictionary for LM is constructed using the lexicon file. (Default: None) nbest (int, optional): number of best decodings to return (Default: 1) beam_size (int, optional): max number of hypos to hold after each decode step (Default: 50) beam_size_token (int, optional): max number of tokens to consider at each decode step. If `None`, it is set to the total number of tokens (Default: None) beam_threshold (float, optional): threshold for pruning hypothesis (Default: 50) lm_weight (float, optional): weight of language model (Default: 2) word_score (float, optional): word insertion score (Default: 0) unk_score (float, optional): unknown word insertion score (Default: -inf) sil_score (float, optional): silence insertion score (Default: 0) log_add (bool, optional): whether or not to use logadd when merging hypotheses (Default: False) blank_token (str, optional): token corresponding to blank (Default: "-") sil_token (str, optional): token corresponding to silence (Default: "|") unk_word (str, optional): word corresponding to unknown (Default: "<unk>") Returns: CTCDecoder: decoder Example >>> decoder = ctc_decoder( >>> lexicon="lexicon.txt", >>> tokens="tokens.txt", >>> lm="kenlm.bin", >>> ) >>> results = decoder(emissions) # List of shape (B, nbest) of Hypotheses """ if lm_dict is not None and type(lm_dict) is not str: raise ValueError("lm_dict must be None or str type.") tokens_dict = _Dictionary(tokens) # decoder options if lexicon: lexicon = _load_words(lexicon) decoder_options = _LexiconDecoderOptions( beam_size=beam_size, beam_size_token=beam_size_token or tokens_dict.index_size(), beam_threshold=beam_threshold, lm_weight=lm_weight, word_score=word_score, unk_score=unk_score, sil_score=sil_score, log_add=log_add, criterion_type=_CriterionType.CTC, ) else: decoder_options = _LexiconFreeDecoderOptions( beam_size=beam_size, beam_size_token=beam_size_token or tokens_dict.index_size(), beam_threshold=beam_threshold, lm_weight=lm_weight, sil_score=sil_score, log_add=log_add, criterion_type=_CriterionType.CTC, ) # construct word dict and language model word_dict = _get_word_dict(lexicon, lm, lm_dict, tokens_dict, unk_word) if type(lm) == str: lm = _KenLM(lm, word_dict) elif lm is None: lm = _ZeroLM() return CTCDecoder( nbest=nbest, lexicon=lexicon, word_dict=word_dict, tokens_dict=tokens_dict, lm=lm, decoder_options=decoder_options, blank_token=blank_token, sil_token=sil_token, unk_word=unk_word, )
def _get_filenames(model: str) -> _PretrainedFiles: if model not in ["librispeech", "librispeech-3-gram", "librispeech-4-gram"]: raise ValueError( f"{model} not supported. Must be one of ['librispeech-3-gram', 'librispeech-4-gram', 'librispeech']" ) prefix = f"decoder-assets/{model}" return _PretrainedFiles( lexicon=f"{prefix}/lexicon.txt", tokens=f"{prefix}/tokens.txt", lm=f"{prefix}/lm.bin" if model != "librispeech" else None, )
[docs]def download_pretrained_files(model: str) -> _PretrainedFiles: """ Retrieves pretrained data files used for :func:`ctc_decoder`. Args: model (str): pretrained language model to download. Valid values are: ``"librispeech-3-gram"``, ``"librispeech-4-gram"`` and ``"librispeech"``. Returns: Object with the following attributes * ``lm``: path corresponding to downloaded language model, or ``None`` if the model is not associated with an lm * ``lexicon``: path corresponding to downloaded lexicon file * ``tokens``: path corresponding to downloaded tokens file """ files = _get_filenames(model) lexicon_file = download_asset(files.lexicon) tokens_file = download_asset(files.tokens) if files.lm is not None: lm_file = download_asset(files.lm) else: lm_file = None return _PretrainedFiles( lexicon=lexicon_file, tokens=tokens_file, lm=lm_file, )

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources