Source code for torchtext.datasets.wikitext2
import os
from typing import Union, Tuple
from torchtext._internal.module_utils import is_module_available
from torchtext.data.datasets_utils import (
_wrap_split_argument,
_create_dataset_directory,
)
if is_module_available("torchdata"):
from torchdata.datapipes.iter import FileOpener, HttpReader, IterableWrapper
URL = "https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-v1.zip"
MD5 = "542ccefacc6c27f945fb54453812b3cd"
NUM_LINES = {
"train": 36718,
"valid": 3760,
"test": 4358,
}
DATASET_NAME = "WikiText2"
_EXTRACTED_FILES = {
"train": os.path.join("wikitext-2", "wiki.train.tokens"),
"test": os.path.join("wikitext-2", "wiki.test.tokens"),
"valid": os.path.join("wikitext-2", "wiki.valid.tokens"),
}
[docs]@_create_dataset_directory(dataset_name=DATASET_NAME)
@_wrap_split_argument(("train", "valid", "test"))
def WikiText2(root: str, split: Union[Tuple[str], str]):
"""WikiText2 Dataset
For additional details refer to https://blog.salesforceairesearch.com/the-wikitext-long-term-dependency-language-modeling-dataset/
Number of lines per split:
- train: 36718
- valid: 3760
- test: 4358
Args:
root: Directory where the datasets are saved. Default: os.path.expanduser('~/.torchtext/cache')
split: split or splits to be returned. Can be a string or tuple of strings. Default: (`train`, `valid`, `test`)
:returns: DataPipe that yields text from Wikipedia articles
:rtype: str
"""
if not is_module_available("torchdata"):
raise ModuleNotFoundError(
"Package `torchdata` not found. Please install following instructions at `https://github.com/pytorch/data`"
)
url_dp = IterableWrapper([URL])
# cache data on-disk
cache_compressed_dp = url_dp.on_disk_cache(
filepath_fn=lambda x: os.path.join(root, os.path.basename(x)),
hash_dict={os.path.join(root, os.path.basename(URL)): MD5},
hash_type="md5",
)
cache_compressed_dp = HttpReader(cache_compressed_dp).end_caching(
mode="wb", same_filepath_fn=True
)
cache_decompressed_dp = cache_compressed_dp.on_disk_cache(
filepath_fn=lambda x: os.path.join(root, _EXTRACTED_FILES[split])
)
# Extract zip and filter the appropriate split file
cache_decompressed_dp = (
FileOpener(cache_decompressed_dp, mode="b")
.read_from_zip()
.filter(lambda x: _EXTRACTED_FILES[split] in x[0])
)
cache_decompressed_dp = cache_decompressed_dp.end_caching(
mode="wb", same_filepath_fn=True
)
data_dp = FileOpener(cache_decompressed_dp, mode="b")
return data_dp.readlines(strip_newline=False, decode=True, return_path=False)