Skip to content

variantplaner

VariantPlaner, a tool kit to manage many variants without many cpu and ram resource.

Convert a vcf in parquet, convert annotations in parquet, convert parquet in vcf.

But also build a file struct to get a fast variant database interrogations time.

Modules:

  • cli

    Module contains command line entry point function.

  • exception

    Exception could be generate by VariantPlanner.

  • generate

    Function to generate information.

  • io

    Module manage input parsing and output serializing.

  • normalization

    Function use to normalize data.

  • objects

    Module to store variantplaner object.

  • struct

    Generated data structures for easy integration.

Classes:

  • Annotations

    Object to manage lazyframe as Annotations.

  • ContigsLength

    Store contigs -> length information.

  • Genotypes

    Object to manage lazyframe as Genotypes.

  • Pedigree

    Object to manage lazyframe as Variants.

  • Variants

    Object to manage lazyframe as Variants.

  • Vcf

    Object to manage lazyframe as Vcf.

  • VcfHeader

    Object that parse and store vcf information.

  • VcfParsingBehavior

    Enumeration use to control behavior of IntoLazyFrame.

Annotations

Annotations()

Bases: LazyFrame

Object to manage lazyframe as Annotations.

Methods:

  • minimal_schema

    Get minimal schema of genotypes polars.LazyFrame.

Source code in src/variantplaner/objects/annotations.py
15
16
17
def __init__(self):
    """Initialize a Annotations object."""
    self.lf = polars.LazyFrame(schema=Annotations.minimal_schema())

minimal_schema classmethod

minimal_schema() -> dict[str, type]

Get minimal schema of genotypes polars.LazyFrame.

Source code in src/variantplaner/objects/annotations.py
19
20
21
22
23
24
@classmethod
def minimal_schema(cls) -> dict[str, type]:
    """Get minimal schema of genotypes polars.LazyFrame."""
    return {
        "id": polars.UInt64,
    }

ContigsLength

ContigsLength()

Store contigs -> length information.

Methods:

Source code in src/variantplaner/objects/contigs_length.py
31
32
33
34
35
36
37
38
39
def __init__(self):
    """Initialise a contigs length."""
    self.lf = polars.LazyFrame(
        schema={
            "contig": polars.String,
            "length": polars.UInt64,
            "offset": polars.UInt64,
        }
    )

from_path

from_path(
    path: Path, /, **scan_csv_args: Unpack[ScanCsv]
) -> int

Fill object with file point by pathlib.Path.

Argument: path: path of input file

Returns: Number of contigs line view

Source code in src/variantplaner/objects/contigs_length.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def from_path(self, path: pathlib.Path, /, **scan_csv_args: Unpack[ScanCsv]) -> int:
    """Fill object with file point by pathlib.Path.

    Argument:
    path: path of input file

    Returns: Number of contigs line view
    """
    csv = Csv()
    csv.from_path(path, **scan_csv_args)
    self.lf = csv.lf

    self.__compute_offset()

    return self.lf.collect(engine="cpu").shape[0]

from_vcf_header

from_vcf_header(header: VcfHeader) -> int

Fill a object with VcfHeader.

Argument

header: VcfHeader

Returns: Number of contigs line view

Source code in src/variantplaner/objects/contigs_length.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def from_vcf_header(self, header: VcfHeader) -> int:
    """Fill a object with VcfHeader.

    Argument:
       header: VcfHeader

    Returns: Number of contigs line view
    """
    contigs_id = re.compile(r"ID=(?P<id>[^,]+)")
    contigs_len = re.compile(r"length=(?P<length>[^,>]+)")

    count = 0
    contigs2len: dict[str, list] = {"contig": [], "length": []}
    for contig_line in header.contigs:
        if (len_match := contigs_len.search(contig_line)) and (id_match := contigs_id.search(contig_line)):
            contigs2len["contig"].append(id_match.groupdict()["id"])
            contigs2len["length"].append(int(len_match.groupdict()["length"]))
        count += 1

    self.lf = polars.LazyFrame(contigs2len, schema={"contig": polars.String, "length": polars.UInt64})

    self.__compute_offset()

    return count

Genotypes

Genotypes(data: LazyFrame | None = None)

Bases: LazyFrame

Object to manage lazyframe as Genotypes.

Methods:

Source code in src/variantplaner/objects/genotypes.py
15
16
17
18
19
20
def __init__(self, data: polars.LazyFrame | None = None):
    """Initialize a Genotypes object."""
    if data is None:
        self.lf = polars.LazyFrame(schema=Genotypes.minimal_schema())
    else:
        self.lf = data

minimal_schema classmethod

minimal_schema() -> dict[str, type]

Get minimal schema of genotypes polars.LazyFrame.

Source code in src/variantplaner/objects/genotypes.py
26
27
28
29
30
31
32
@classmethod
def minimal_schema(cls) -> dict[str, type]:
    """Get minimal schema of genotypes polars.LazyFrame."""
    return {
        "id": polars.UInt64,
        "sample": polars.String,
    }

samples_names

samples_names() -> list[str]

Get list of sample name.

Source code in src/variantplaner/objects/genotypes.py
22
23
24
def samples_names(self) -> list[str]:
    """Get list of sample name."""
    return self.lf.select("sample").unique("sample").collect().get_column("sample").to_list()

Pedigree

Pedigree()

Bases: LazyFrame

Object to manage lazyframe as Variants.

Methods:

Source code in src/variantplaner/objects/pedigree.py
19
20
21
def __init__(self):
    """Initialize a Variants object."""
    self.lf = polars.LazyFrame(schema=Pedigree.minimal_schema())

from_path

from_path(input_path: Path) -> None

Read a pedigree file in polars.LazyFrame.

Parameters:

  • input_path (Path) –

    Path to pedigree file.

Returns:

  • None

    A polars.LazyFrame that contains ped information ('family_id', 'personal_id', 'father_id', 'mother_id', 'sex', 'affected')

Source code in src/variantplaner/objects/pedigree.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def from_path(self, input_path: pathlib.Path) -> None:
    """Read a pedigree file in [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html).

    Args:
        input_path: Path to pedigree file.

    Returns:
        A [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) that contains ped information ('family_id', 'personal_id', 'father_id', 'mother_id', 'sex', 'affected')
    """
    self.lf = polars.scan_csv(
        input_path,
        separator="\t",
        has_header=False,
        null_values=["None", "unknown"],
        new_columns=[
            "family_id",
            "personal_id",
            "father_id",
            "mother_id",
            "sex",
            "affected",
        ],
        schema_overrides=Pedigree.minimal_schema(),
    )

minimal_schema classmethod

minimal_schema() -> Mapping[str, PolarsDataType]

Get schema of variants polars.LazyFrame.

Source code in src/variantplaner/objects/pedigree.py
62
63
64
65
66
67
68
69
70
71
72
73
74
@classmethod
def minimal_schema(
    cls,
) -> collections.abc.Mapping[str, polars._typing.PolarsDataType]:
    """Get schema of variants polars.LazyFrame."""
    return {
        "family_id": polars.String,
        "personal_id": polars.String,
        "father_id": polars.String,
        "mother_id": polars.String,
        "sex": polars.String,
        "affected": polars.Boolean,
    }

to_path

to_path(output_path: Path) -> None

Write pedigree polars.LazyFrame in ped format.

Warning: This function performs polars.LazyFrame.collect before write csv, this can have a significant impact on memory usage

Parameters:

  • lf

    LazyFrame contains pedigree information.

  • output_path (Path) –

    Path where write pedigree information.

Returns:

  • None

    None

Source code in src/variantplaner/objects/pedigree.py
48
49
50
51
52
53
54
55
56
57
58
59
60
def to_path(self, output_path: pathlib.Path) -> None:
    """Write pedigree [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) in ped format.

    Warning: This function performs [polars.LazyFrame.collect][] before write csv, this can have a significant impact on memory usage

    Args:
        lf: LazyFrame contains pedigree information.
        output_path: Path where write pedigree information.

    Returns:
        None
    """
    self.lf.collect(engine="cpu").write_csv(output_path, include_header=False, separator="\t")

Variants

Variants(data: LazyFrame | None = None)

Bases: LazyFrame

Object to manage lazyframe as Variants.

Methods:

Source code in src/variantplaner/objects/variants.py
15
16
17
18
19
20
def __init__(self, data: polars.LazyFrame | None = None):
    """Initialize a Variants object."""
    if data is None:
        self.lf = polars.LazyFrame(schema=Variants.minimal_schema())
    else:
        self.lf = data

minimal_schema classmethod

minimal_schema() -> dict[str, type]

Get schema of variants polars.LazyFrame.

Source code in src/variantplaner/objects/variants.py
22
23
24
25
26
27
28
29
30
31
@classmethod
def minimal_schema(cls) -> dict[str, type]:
    """Get schema of variants polars.LazyFrame."""
    return {
        "id": polars.UInt64,
        "chr": polars.String,
        "pos": polars.UInt64,
        "ref": polars.String,
        "alt": polars.String,
    }

Vcf

Vcf()

Object to manage lazyframe as Vcf.

Methods:

Source code in src/variantplaner/objects/vcf.py
53
54
55
56
57
def __init__(self):
    """Initialize a Vcf object."""
    self.lf = polars.LazyFrame(schema=Variants.minimal_schema())

    self.header = VcfHeader()

add_genotypes

add_genotypes(genotypes_lf: Genotypes) -> None

Add genotypes information in vcf.

Source code in src/variantplaner/objects/vcf.py
172
173
174
175
176
177
178
179
180
181
182
def add_genotypes(self, genotypes_lf: Genotypes) -> None:
    """Add genotypes information in vcf."""
    for sample in genotypes_lf.samples_names():
        geno2sample = (
            genotypes_lf.lf.filter(polars.col("sample") == sample)
            .rename(
                {col: f"{sample}_{col}" for col in genotypes_lf.lf.collect_schema().names()[2:]},
            )
            .drop("sample")
        )
        self.lf = self.lf.join(geno2sample, on="id", how="full", coalesce=True)

annotations

annotations(
    select_info: set[str] | None = None,
) -> Annotations

Get annotations of vcf.

Source code in src/variantplaner/objects/vcf.py
184
185
186
187
188
def annotations(self, select_info: set[str] | None = None) -> Annotations:
    """Get annotations of vcf."""
    lf = self.lf.with_columns(self.lf.header.info_parser(select_info))

    return lf.drop("chr", "pos", "ref", "alt", "format", "info")

from_path

from_path(
    path: Path,
    chr2len_path: Path | None,
    behavior: VcfParsingBehavior = NOTHING,
) -> None

Populate Vcf object with vcf file.

Source code in src/variantplaner/objects/vcf.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def from_path(
    self,
    path: pathlib.Path,
    chr2len_path: pathlib.Path | None,
    behavior: VcfParsingBehavior = VcfParsingBehavior.NOTHING,
) -> None:
    """Populate Vcf object with vcf file."""
    with xopen.xopen(path) as fh:
        try:
            self.header.from_lines(fh)
        except NotVcfHeaderError as e:
            raise NotAVCFError(path) from e

    chr2len = ContigsLength()
    if chr2len_path is not None:
        if chr2len.from_path(chr2len_path) == 0 and chr2len.from_vcf_header(self.header) == 0:
            raise NoContigsLengthInformationError
    elif chr2len.from_vcf_header(self.header) == 0:
        raise NoContigsLengthInformationError

    self.lf = polars.scan_csv(
        path,
        separator="\t",
        comment_prefix="#",
        has_header=False,
        schema_overrides=Vcf.schema(),
        new_columns=list(Vcf.schema().keys()),
    )

    schema = self.lf.collect_schema()
    self.lf = self.lf.rename(dict(zip(schema.names(), self.header.column_name(schema.len()))))
    self.lf = self.lf.cast(Vcf.schema())

    if behavior & VcfParsingBehavior.MANAGE_SV:
        self.lf = self.lf.with_columns(self.header.info_parser({"SVTYPE", "SVLEN"}))

    if behavior & VcfParsingBehavior.KEEP_STAR:
        self.lf = self.lf.filter(polars.col("alt") != "*")

    self.lf = normalization.add_variant_id(self.lf, chr2len.lf)

    if behavior & VcfParsingBehavior.MANAGE_SV:
        self.lf = self.lf.drop("SVTYPE", "SVLEN", strict=False)

genotypes

genotypes() -> Genotypes

Get genotype of vcf.

Source code in src/variantplaner/objects/vcf.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def genotypes(self) -> Genotypes:
    """Get genotype of vcf."""
    schema = self.lf.collect_schema()

    if "format" not in schema.names():
        raise NoGenotypeError

    lf = self.lf.select([*schema.names()[schema.names().index("format") :]])
    schema = lf.collect_schema()

    # Split genotype column in sub value
    col2expr = self.header.format_parser()

    format_strs = lf.select("format").unique().collect().get_column("format")

    sublfs = []
    for fstr in format_strs.to_list():
        # Found index of genotyping value
        col_index = {
            key: index
            for (index, key) in enumerate(
                fstr.split(":"),
            )
        }

        sublf = lf.filter(polars.col("format") == fstr)

        # Pivot value
        sublf = sublf.unpivot(index=["id"]).with_columns(
            [
                polars.col("id"),
                polars.col("variable").alias("sample"),
                polars.col("value").str.split(":"),
            ],
        )

        conversion = []
        for col in col2expr:
            if col in col_index:
                conversion.append(
                    polars.col("value")
                    .list.get(col_index[col], null_on_oob=True)
                    .pipe(function=col2expr[col], col_name=col)
                )
            else:
                conversion.append(polars.lit("").pipe(function=col2expr[col], col_name=col))

        sublf = sublf.with_columns(conversion)

        sublfs.append(sublf)

    genotypes = Genotypes()
    lf = polars.concat(sublfs).drop("variable", "value")

    if "gt".upper() in col2expr:
        lf = lf.filter(polars.col("gt") != 0)

    genotypes.lf = lf

    return genotypes

schema classmethod

schema() -> Mapping[str, PolarsDataType]

Get schema of Vcf polars.LazyFrame.

Source code in src/variantplaner/objects/vcf.py
190
191
192
193
194
195
196
197
198
199
200
201
202
@classmethod
def schema(cls) -> collections.abc.Mapping[str, polars._typing.PolarsDataType]:
    """Get schema of Vcf polars.LazyFrame."""
    return {
        "chr": polars.String,
        "pos": polars.UInt64,
        "vid": polars.String,
        "ref": polars.String,
        "alt": polars.String,
        "qual": polars.String,
        "filter": polars.String,
        "info": polars.String,
    }

set_variants

set_variants(variants: Variants) -> None

Set variants of vcf.

Source code in src/variantplaner/objects/vcf.py
107
108
109
def set_variants(self, variants: Variants) -> None:
    """Set variants of vcf."""
    self.lf = variants.lf

variants

variants() -> Variants

Get variants of vcf.

Source code in src/variantplaner/objects/vcf.py
103
104
105
def variants(self) -> Variants:
    """Get variants of vcf."""
    return self.lf.select(Variants.minimal_schema().keys())

VcfHeader

VcfHeader()

Object that parse and store vcf information.

Methods:

Attributes:

  • contigs (Iterator[str]) –

    Get an iterator of line contains chromosomes information.

  • samples_index (dict[str, int] | None) –

    Read vcf header to generate an association map between sample name and index.

Source code in src/variantplaner/objects/vcf_header.py
34
35
36
def __init__(self):
    """Initialise VcfHeader."""
    self._header = []

contigs cached property

contigs: Iterator[str]

Get an iterator of line contains chromosomes information.

Returns: String iterator

samples_index cached property

samples_index: dict[str, int] | None

Read vcf header to generate an association map between sample name and index.

Args: header: Header string.

Returns: Map that associate a sample name to is sample index.

Raises: NotVcfHeaderError: If all line not start by '#CHR'

build_metadata

build_metadata(
    select_columns: list[str] | None = None,
) -> dict[str, str]

Generate metadata associate to vcf_header.

Args: select_columns: Output only columns in this list.

Returns: An associations map for column name to corresponding header line

Source code in src/variantplaner/objects/vcf_header.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def build_metadata(self, select_columns: list[str] | None = None) -> dict[str, str]:
    """Generate metadata associate to vcf_header.

    Args:
    select_columns: Output only columns in this list.

    Returns: An associations map for column name to corresponding header line
    """
    metadata: dict[str, str] = {}

    for line in self._header:
        if line.startswith("#CHROM"):
            return metadata

        if (
            (line.startswith("##FORMAT"))
            and (search := FORMAT_RE.search(line))
            and (not select_columns or search["id"].lower() in select_columns)
        ):
            metadata[search["id"].lower()] = line.lstrip("#")

        if (
            line.startswith("##INFO")
            and (search := INFO_RE.search(line))
            and (not select_columns or search["id"].lower() in select_columns)
        ):
            metadata[search["id"].lower()] = line.lstrip("#")

    return metadata

column_name

column_name(
    number_of_column: int = MINIMAL_COL_NUMBER,
) -> Iterator[str]

Get an iterator of correct column name.

Returns: String iterator

Source code in src/variantplaner/objects/vcf_header.py
224
225
226
227
228
229
230
231
232
233
234
235
def column_name(self, number_of_column: int = MINIMAL_COL_NUMBER) -> typing.Iterator[str]:
    """Get an iterator of correct column name.

    Returns: String iterator
    """
    base_col_name = ["chr", "pos", "vid", "ref", "alt", "qual", "filter", "info"]

    yield from base_col_name

    if number_of_column > MINIMAL_COL_NUMBER and (samples := self.samples_index):
        yield "format"
        yield from (sample for (sample, _) in samples.items())

format_parser

format_parser(
    select_format: set[str] | None = None,
) -> dict[str, Callable[[Expr, str], Expr]]

Generate a list of polars.Expr to extract genotypes information.

Warning: Float values can't be converted for the moment they are stored as String to keep information

Args: header: Line of vcf header. input_path: Path to vcf file. select_format: List of target format field.

Returns: A dict to link format id to pipeable function with Polars.Expr

Raises: NotVcfHeaderError: If all line not start by '#CHR'

Source code in src/variantplaner/objects/vcf_header.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def format_parser(
    self,
    select_format: set[str] | None = None,
) -> dict[str, typing.Callable[[polars.Expr, str], polars.Expr]]:
    """Generate a list of [polars.Expr](https://pola-rs.github.io/polars/py-polars/html/reference/expressions/index.html) to extract genotypes information.

    **Warning**: Float values can't be converted for the moment they are stored as String to keep information

    Args:
    header: Line of vcf header.
    input_path: Path to vcf file.
    select_format: List of target format field.

    Returns:
    A dict to link format id to pipeable function with Polars.Expr

    Raises:
    NotVcfHeaderError: If all line not start by '#CHR'
    """
    expressions: dict[str, typing.Callable[[polars.Expr, str], polars.Expr]] = {}

    for line in self._header:
        if line.startswith("#CHROM"):
            return expressions

        if not line.startswith("##FORMAT"):
            continue

        if (search := FORMAT_RE.search(line)) and (not select_format or search["id"] in select_format):
            name = search["id"]
            number = search["number"]
            format_type = search["type"]

            if name == "GT":
                expressions["GT"] = VcfHeader.__format_gt
                continue

            if number == "1":
                if format_type == "Integer":
                    expressions[name] = VcfHeader.__format_one_int
                elif format_type == "Float":  # noqa: SIM114 Float isn't already support but in future
                    expressions[name] = VcfHeader.__format_one_str
                elif format_type in {"String", "Character"}:
                    expressions[name] = VcfHeader.__format_one_str
                else:
                    pass  # Not reachable

            elif format_type == "Integer":
                expressions[name] = VcfHeader.__format_list_int
            elif format_type == "Float":  # noqa: SIM114 Float isn't already support but in future
                expressions[name] = VcfHeader.__format_list_str
            elif format_type in {"String", "Character"}:
                expressions[name] = VcfHeader.__format_list_str
            else:
                pass  # Not reachable

    raise NotVcfHeaderError

from_files

from_files(path: Path) -> None

Populate VcfHeader object with content of only header file.

Args: path: Path of file

Returns: None

Source code in src/variantplaner/objects/vcf_header.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def from_files(self, path: pathlib.Path) -> None:
    """Populate VcfHeader object with content of only header file.

    Args:
    path: Path of file

    Returns:
    None
    """
    with open(path) as fh:
        for full_line in fh:
            line = full_line.strip()
            self._header.append(line)

from_lines

from_lines(lines: Iterator[str]) -> None

Extract all header information of vcf lines.

Line between start of file and first line start with '#CHROM' or not start with '#'

Args: lines: Iterator of line

Returns: None

Raises: NotAVcfHeader: If a line not starts with '#' NotAVcfHeader: If no line start by '#CHROM'

Source code in src/variantplaner/objects/vcf_header.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def from_lines(self, lines: typing.Iterator[str]) -> None:
    """Extract all header information of vcf lines.

    Line between start of file and first line start with '#CHROM' or not start with '#'

    Args:
    lines: Iterator of line

    Returns: None

    Raises:
    NotAVcfHeader: If a line not starts with '#'
    NotAVcfHeader: If no line start by '#CHROM'
    """
    for full_line in lines:
        line = full_line.strip()

        if not line.startswith("#"):
            raise NotVcfHeaderError

        if line.startswith("#CHROM"):
            self._header.append(line)
            return

        self._header.append(line)

    raise NotVcfHeaderError

info_parser

info_parser(
    select_info: set[str] | None = None,
) -> list[Expr]

Generate a list of polars.Expr to extract variants information.

Args: header: Line of vcf header input_path: Path to vcf file. select_info: List of target info field

Returns: List of polars.Expr to parse info columns.

Raises: NotVcfHeaderError: If all line not start by '#CHR'

Source code in src/variantplaner/objects/vcf_header.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def info_parser(self, select_info: set[str] | None = None) -> list[polars.Expr]:
    """Generate a list of [polars.Expr](https://pola-rs.github.io/polars/py-polars/html/reference/expressions/index.html) to extract variants information.

    Args:
    header: Line of vcf header
    input_path: Path to vcf file.
    select_info: List of target info field

    Returns:
    List of [polars.Expr](https://pola-rs.github.io/polars/py-polars/html/reference/expressions/index.html) to parse info columns.

    Raises:
    NotVcfHeaderError: If all line not start by '#CHR'
    """
    expressions: list[polars.Expr] = []

    for line in self._header:
        if line.startswith("#CHROM"):
            return expressions

        if not line.startswith("##INFO"):
            continue

        if (search := INFO_RE.search(line)) and (not select_info or search["id"] in select_info):
            regex = rf"{search['id']}=([^;]+);?"

            local_expr = polars.col("info").str.extract(regex, 1)

            if search["number"] == "1":
                if search["type"] == "Integer":
                    local_expr = local_expr.cast(polars.Int64)
                elif search["type"] == "Float":
                    local_expr = local_expr.cast(polars.Float64)
                elif search["type"] in {"String", "Character"}:
                    pass  # Not do anything on string or character
                else:
                    pass  # Not reachable

            else:
                local_expr = local_expr.str.split(",")
                if search["type"] == "Integer":
                    local_expr = local_expr.cast(polars.List(polars.Int64))
                elif search["type"] == "Float":
                    local_expr = local_expr.cast(polars.List(polars.Float64))
                elif search["type"] in {"String", "Character"}:
                    pass  # Not do anything on string or character
                else:
                    pass  # Not reachable

            expressions.append(local_expr.alias(search["id"]))

    raise NotVcfHeaderError

VcfParsingBehavior

Bases: IntFlag

Enumeration use to control behavior of IntoLazyFrame.

Attributes:

  • KEEP_STAR

    Keep star variant.

  • MANAGE_SV

    into_lazyframe try to avoid structural variant id collision, SVTYPE/SVLEN info value must be present.

  • NOTHING

    into_lazyframe not have any specific behavior

KEEP_STAR class-attribute instance-attribute

KEEP_STAR = auto()

Keep star variant.

MANAGE_SV class-attribute instance-attribute

MANAGE_SV = auto()

into_lazyframe try to avoid structural variant id collision, SVTYPE/SVLEN info value must be present.

NOTHING class-attribute instance-attribute

NOTHING = auto()

into_lazyframe not have any specific behavior

any2string

any2string(value: Any) -> str

Convert an int in a string. Use for temp file creation.

Source code in src/variantplaner/__init__.py
26
27
28
def any2string(value: typing.Any) -> str:
    """Convert an int in a string. Use for temp file creation."""
    return str(uuid.uuid5(uuid.NAMESPACE_URL, str(value)))

exception

Exception could be generate by VariantPlanner.

Classes:

  • NoContigsLengthInformationError

    Exception raise if we didn't get Contigs Length information in vcf or in compagnion file.

  • NoGTError

    Exception raise if genotype polars.LazyFrame not contains gt column.

  • NoGenotypeError

    Exception raise if vcf file seems not contains genotypes information.

  • NotAVCFError

    Exception raise if file read seems not be a vcf, generally not contains a line starts with '#CHROM'.

  • NotAVariantCsvError

    Exception raise if file is a csv should contains variants info but columns name not match minimal requirement.

  • NotVcfHeaderError

    Exception raise if header isn't compatible with vcf.

NoContigsLengthInformationError

NoContigsLengthInformationError()

Bases: Exception

Exception raise if we didn't get Contigs Length information in vcf or in compagnion file.

Source code in src/variantplaner/exception.py
19
20
21
def __init__(self):
    """Initize no contigs length information error."""
    super().__init__("Contigs length information is required in vcf header of in compagnion file.")

NoGTError

NoGTError(message: str)

Bases: Exception

Exception raise if genotype polars.LazyFrame not contains gt column.

Source code in src/variantplaner/exception.py
59
60
61
def __init__(self, message: str):
    """Initialize no gt error."""
    super().__init__(f"In {message} gt column is missing.")

NoGenotypeError

NoGenotypeError()

Bases: Exception

Exception raise if vcf file seems not contains genotypes information.

Source code in src/variantplaner/exception.py
51
52
53
def __init__(self):
    """Initialize no genotype error."""
    super().__init__("LazyFrame seems not contains genotypes.")

NotAVCFError

NotAVCFError(path: Path)

Bases: Exception

Exception raise if file read seems not be a vcf, generally not contains a line starts with '#CHROM'.

Source code in src/variantplaner/exception.py
43
44
45
def __init__(self, path: pathlib.Path):
    """Initialize not a vcf error."""
    super().__init__(f"File {path} seems not be a valid vcf file.")

NotAVariantCsvError

NotAVariantCsvError(path: Path)

Bases: Exception

Exception raise if file is a csv should contains variants info but columns name not match minimal requirement.

Source code in src/variantplaner/exception.py
27
28
29
def __init__(self, path: pathlib.Path):
    """Initialize not a variant csv error."""
    super().__init__(f"{path} seems not be a csv variant.")

NotVcfHeaderError

NotVcfHeaderError()

Bases: Exception

Exception raise if header isn't compatible with vcf.

Source code in src/variantplaner/exception.py
35
36
37
def __init__(self):
    """Initialize not a vcf header error."""
    super().__init__("Not a vcf header")

generate

Function to generate information.

Functions:

transmission

transmission(
    genotypes_lf: LazyFrame,
    index_names: tuple[str],
    mother_names: tuple[str | None] = (None,),
    father_names: tuple[str | None] = (None,),
) -> DataFrame | None

Compute how each variant are transmite to index case.

Parameters:

  • genotypes_lf (LazyFrame) –

    Genotypes polars.LazyFrame, gt column are required.

  • index_name

    Sample name of index case.

  • mother_name

    Sample name of mother.

  • father_name

    Sample name of father.

Returns:

  • DataFrame | None

    polars.DataFrame with transmission information. With genotyping information for index, mother and father. If any of them isn't present value are set to polars.Null. Columns transmission contains a string: concat(chr(index_gt + 33), chr(mother_gt + 33), chr(father_gt + 33)), transmission: #~! mean homozygote diploide variant not present in father but with no information about mother.

Raises:

  • NoGTError

    if genotypes_lf not containts gt column.

Source code in src/variantplaner/generate.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def transmission(
    genotypes_lf: polars.LazyFrame,
    index_names: tuple[str],
    mother_names: tuple[str | None] = (None,),
    father_names: tuple[str | None] = (None,),
) -> polars.DataFrame | None:
    """Compute how each variant are transmite to index case.

    Args:
        genotypes_lf: Genotypes [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html), `gt` column are required.
        index_name: Sample name of index case.
        mother_name: Sample name of mother.
        father_name: Sample name of father.

    Returns:
         [polars.DataFrame](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html) with transmission information. With genotyping information for index, mother and father. If any of them isn't present value are set to polars.Null. Columns transmission contains a string: concat(chr(index_gt + 33), chr(mother_gt + 33), chr(father_gt + 33)), transmission: `#~!` mean homozygote diploide variant not present in father but with no information about mother.

    Raises:
        NoGTError: if genotypes_lf not containts gt column.
    """
    genotypes_column = list(genotypes_lf.collect_schema().names()[2:])
    if "gt" not in genotypes_column:
        raise NoGTError("genotype polars.LazyFrame")

    genotypes_df = genotypes_lf.collect()

    dfs = []
    for names in itertools.zip_longest(index_names, mother_names, father_names, fillvalue=None):
        df = __trio(genotypes_df, genotypes_column, *names)
        if df is not None:
            dfs.append(df)

    if len(dfs) > 0:
        return polars.concat(dfs)
    return None

transmission_ped

transmission_ped(
    genotypes_lf: LazyFrame, pedigree_lf: LazyFrame
) -> DataFrame | None

Compute transmission of each variants.

Warning: only the first sample with two parent are considered.

Parameters:

Returns:

  • DataFrame | None

    DataFrame with transmission information

Raises:

  • NoGTError

    If genotypes_lf not contains gt column.

Source code in src/variantplaner/generate.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def transmission_ped(
    genotypes_lf: polars.LazyFrame,
    pedigree_lf: polars.LazyFrame,
) -> polars.DataFrame | None:
    """Compute transmission of each variants.

    **Warning**: only the first sample with two parent are considered.

    Args:
        genotypes_lf: Genotypes [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html), `gt` column are required.
        pedigree_lf: Pedigree [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html).

    Returns:
         DataFrame with transmission information

    Raises:
        NoGTError: If genotypes_lf not contains gt column.
    """
    pedigree_df = pedigree_lf.collect()

    if pedigree_df.height > 1:
        pedigree_df = pedigree_df.filter(polars.col("father_id").is_not_null() | polars.col("mother_id").is_not_null())

    return transmission(
        genotypes_lf,
        tuple(pedigree_df.get_column("personal_id").to_list()),
        tuple(pedigree_df.get_column("mother_id").to_list()),
        tuple(pedigree_df.get_column("father_id").to_list()),
    )

normalization

Function use to normalize data.

Functions:

add_id_part

add_id_part(
    lf: LazyFrame, number_of_bits: int = 8
) -> LazyFrame

Add column id part.

If id is large variant id value, id_part are set to 255, other value most weigthed position 8 bits are use.

Parameters:

Returns:

Source code in src/variantplaner/normalization.py
79
80
81
82
83
84
85
86
87
88
89
90
def add_id_part(lf: polars.LazyFrame, number_of_bits: int = 8) -> polars.LazyFrame:
    """Add column id part.

    If id is large variant id value, id_part are set to 255, other value most weigthed position 8 bits are use.

    Args:
        lf: [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) contains: id column.

    Returns:
        [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) with column id_part added
    """
    return lf.with_columns(id_part=variant_id.compute_part("id", number_of_bits=number_of_bits))

add_variant_id

add_variant_id(
    lf: LazyFrame, chrom2length: LazyFrame
) -> LazyFrame

Add a column id of variants.

Id computation is based on

Two different algorithms are used to calculate the variant identifier, depending on the cumulative length of the reference and alternative sequences.

If the cumulative length of the reference and alternative sequences is short, the leftmost bit of the id is set to 0, then a unique 63-bit hash of the variant is calculated.

If the cumulative length of the reference and alternative sequences is long, the right-most bit of the id will have a value of 1, followed by a hash function, used in Firefox, of the chromosome, position, reference and alternative sequence without the right-most bit.

If lf.columns contains SVTYPE and SVLEN variant with regex group in alt <([^:]+).*> match SVTYPE are replaced by concatenation of SVTYPE and SVLEN first value.

Parameters:

Returns:

Source code in src/variantplaner/normalization.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def add_variant_id(lf: polars.LazyFrame, chrom2length: polars.LazyFrame) -> polars.LazyFrame:
    """Add a column id of variants.

    Id computation is based on

    Two different algorithms are used to calculate the variant identifier, depending on the cumulative length of the reference and alternative sequences.

    If the cumulative length of the reference and alternative sequences is short, the leftmost bit of the id is set to 0, then a unique 63-bit hash of the variant is calculated.

    If the cumulative length of the reference and alternative sequences is long, the right-most bit of the id will have a value of 1, followed by a hash function, used in Firefox, of the chromosome, position, reference and alternative sequence without the right-most bit.

    If lf.columns contains SVTYPE and SVLEN variant with regex group in alt <([^:]+).*> match SVTYPE are replaced by concatenation of SVTYPE and SVLEN first value.

    Args:
        lf: [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) contains: chr, pos, ref, alt columns.
        chrom2length: [polars.DataFrame](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html) contains: chr and length columns.

    Returns:
        [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) with chr column normalized
    """
    real_pos_max = chrom2length.select([polars.col("length").sum()]).collect().get_column("length").max()

    large_variant_len = (64 - len(format(real_pos_max, "b")) - 2) // 2 + 1

    col_names = lf.collect_schema().names()
    if "SVTYPE" in col_names and "SVLEN" in col_names:
        lf = lf.with_columns(
            alt=polars.when(
                polars.col("alt").str.replace("<(?<type>[^:]+).*>", "$type") == polars.col("SVTYPE"),
            )
            .then(
                polars.col("alt")
                .str.replace(
                    ".+",
                    polars.concat_str(
                        [polars.col("SVTYPE"), polars.col("SVLEN").list.get(0)],
                        separator="-",
                    ),
                )
                .str.pad_end(large_variant_len, "-"),
            )
            .otherwise(
                polars.col("alt"),
            ),
        )

    lf = lf.with_columns(alt=polars.col("alt").str.replace("\\*", "*" * large_variant_len))
    lf = lf.join(chrom2length, right_on="contig", left_on="chr", how="left", coalesce=True)
    lf = lf.with_columns(real_pos=polars.col("pos") + polars.col("offset"))

    lf = lf.with_columns(
        id=variant_id.compute_id(
            "real_pos",
            "ref",
            "alt",
            real_pos_max,
        ),
    )

    return lf.drop(["real_pos", "length", "offset"])

io

Module manage input parsing and output serializing.

Modules:

  • vcf

    Read and write vcf file.

vcf

Read and write vcf file.

Functions:

build_rename_column

build_rename_column(
    chromosome: str,
    pos: str,
    identifier: str,
    ref: str,
    alt: str,
    qual: str | None = ".",
    filter_col: str | None = ".",
    info: list[tuple[str, str]] | None = None,
    format_string: str | None = None,
    sample: dict[str, dict[str, str]] | None = None,
) -> RenameCol

A helper function to generate rename column dict for variantplaner.io.vcf.lazyframe_in_vcf function parameter.

Returns:

  • RenameCol

    A rename column dictionary.

Source code in src/variantplaner/io/vcf.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def build_rename_column(
    chromosome: str,
    pos: str,
    identifier: str,
    ref: str,
    alt: str,
    qual: str | None = ".",
    filter_col: str | None = ".",
    info: list[tuple[str, str]] | None = None,
    format_string: str | None = None,
    sample: dict[str, dict[str, str]] | None = None,
) -> RenameCol:
    """A helper function to generate rename column dict for [variantplaner.io.vcf.lazyframe_in_vcf][] function parameter.

    Returns:
        A rename column dictionary.
    """
    return {
        "#CHROM": chromosome,
        "POS": pos,
        "ID": identifier,
        "REF": ref,
        "ALT": alt,
        "QUAL": "." if qual is None else qual,
        "FILTER": "." if filter_col is None else filter_col,
        "INFO": [] if info is None else info,
        "FORMAT": "" if format_string is None else format_string,
        "sample": {} if sample is None else sample,
    }

lazyframe_in_vcf

lazyframe_in_vcf(
    lf: LazyFrame,
    output_path: Path,
    /,
    vcf_header: VcfHeader | None = None,
    renaming: RenameCol = DEFAULT_RENAME,
) -> None

Write polars.LazyFrame in vcf format.

Warning: This function performs polars.LazyFrame.collect before write vcf, this can have a significant impact on memory usage.

Parameters:

  • lf (LazyFrame) –

    LazyFrame contains information.

  • output_path (Path) –

    Path to where vcf to write.

Returns:

  • None

    None

Source code in src/variantplaner/io/vcf.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def lazyframe_in_vcf(
    lf: polars.LazyFrame,
    output_path: pathlib.Path,
    /,
    vcf_header: VcfHeader | None = None,
    renaming: RenameCol = DEFAULT_RENAME,
) -> None:
    """Write [polars.LazyFrame](https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html) in vcf format.

    Warning: This function performs [polars.LazyFrame.collect][] before write vcf, this can have a significant impact on memory usage.

    Args:
        lf: LazyFrame contains information.
        output_path: Path to where vcf to write.

    Returns:
        None
    """
    select_column: list[str] = []

    lf = lf.with_columns(
        [
            polars.col(renaming["#CHROM"]).alias("#CHROM"),
            polars.col(renaming["POS"]).alias("POS"),
            polars.col(renaming["ID"]).alias("ID"),
            polars.col(renaming["REF"]).alias("REF"),
            polars.col(renaming["ALT"]).alias("ALT"),
        ],
    )

    select_column.extend(["#CHROM", "POS", "ID", "REF", "ALT"])

    if vcf_header is None:
        header = __generate_header(lf, renaming["INFO"], list(renaming["sample"].keys()), renaming["FORMAT"])
    else:
        header = "\n".join(vcf_header._header)

    if renaming["QUAL"] != ".":
        lf = lf.with_columns([polars.col(renaming["QUAL"]).alias("QUAL")])
    else:
        lf = lf.with_columns([polars.lit(".").alias("QUAL")])

    select_column.append("QUAL")

    if renaming["FILTER"] != ".":
        lf = lf.with_columns([polars.col(renaming["FILTER"]).alias("FILTER")])
    else:
        lf = lf.with_columns([polars.lit(".").alias("FILTER")])

    select_column.append("FILTER")

    lf = (
        __rebuild_info_column(lf, renaming["INFO"])
        if renaming["INFO"]
        else lf.with_columns(polars.lit(".").alias("INFO"))
    )

    select_column.append("INFO")

    if renaming["FORMAT"]:
        lf = lf.with_columns(polars.lit(renaming["FORMAT"]).alias("FORMAT"))
        select_column.append("FORMAT")

    if renaming["FORMAT"] and renaming["sample"]:
        schema = lf.collect_schema()
        for sample_name in renaming["sample"]:
            lf = lf.with_columns(
                [
                    __lazy2format(
                        sample_name,
                        renaming["FORMAT"],
                        dict(zip(schema.names(), schema.dtypes())),
                    ).alias(sample_name),
                ],
            )
            select_column.append(sample_name)

    lf = lf.select([polars.col(col) for col in select_column])

    with open(output_path, "wb") as fh:
        fh.write(header.encode())
        fh.write(lf.collect().write_csv(separator="\t").encode())

struct

Generated data structures for easy integration.

Modules:

  • genotypes

    Function relate to genotype structuration.

  • variants

    Function relate to vcf structuration.

genotypes

Function relate to genotype structuration.

Functions:

  • hive

    Read all genotypes parquet file and use information to generate a hive like struct, based on 63rd and 55th bits included of variant id with genotype information.

hive

hive(
    paths: list[Path],
    output_prefix: Path,
    threads: int,
    file_per_thread: int,
    *,
    append: bool,
    number_of_bits: int = 8,
) -> None

Read all genotypes parquet file and use information to generate a hive like struct, based on 63rd and 55th bits included of variant id with genotype information.

Real number of threads use are equal to \(min(threads, len(paths))\).

Output format look like: {output_prefix}/id_part=[0..2.pow(number_of_bits)]/0.parquet.

Parameters:

  • paths (list[Path]) –

    list of file you want reorganize

  • output_prefix (Path) –

    prefix of hive

  • threads (int) –

    number of multiprocessing threads run

  • file_per_thread (int) –

    number of file manage per multiprocessing threads

Returns:

  • None

    None

Source code in src/variantplaner/struct/genotypes.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def hive(
    paths: list[pathlib.Path],
    output_prefix: pathlib.Path,
    threads: int,
    file_per_thread: int,
    *,
    append: bool,
    number_of_bits: int = 8,
) -> None:
    r"""Read all genotypes parquet file and use information to generate a hive like struct, based on 63rd and 55th bits included of variant id with genotype information.

    Real number of threads use are equal to $min(threads, len(paths))$.

    Output format look like: `{output_prefix}/id_part=[0..2.pow(number_of_bits)]/0.parquet`.

    Args:
        paths: list of file you want reorganize
        output_prefix: prefix of hive
        threads: number of multiprocessing threads run
        file_per_thread: number of file manage per multiprocessing threads

    Returns:
        None
    """
    logger.info(f"{paths=} {output_prefix=}, {threads=}, {file_per_thread=}, {append=} {number_of_bits=}")

    if len(paths) == 0:
        return

    for i in range(pow(2, number_of_bits)):
        (output_prefix / f"id_part={i}").mkdir(parents=True, exist_ok=True)

    path_groups: typing.Iterable[typing.Iterable[pathlib.Path]] = list(
        [[path] for path in paths]
        if file_per_thread < 2  # noqa: PLR2004 if number of file is lower than 2 file grouping isn't required
        else itertools.zip_longest(
            *[iter(paths)] * file_per_thread,
        ),
    )

    basenames = [
        variantplaner.any2string(hash("_".join(p.stem for p in g_paths if p is not None))) for g_paths in path_groups
    ]

    column_order = None
    lf_groups = []
    for g_paths in path_groups:
        group = []
        for p in g_paths:
            if p is None:
                continue

            lf = polars.scan_parquet(p, hive_partitioning=False)
            if column_order is None:
                column_order = lf.collect_schema().names()
            group.append(lf.select(column_order))

        lf_groups.append(group)

    logger.info(f"{path_groups=}, {basenames=}")

    with multiprocessing.get_context("spawn").Pool(threads) as pool:
        pool.starmap(
            __hive_worker,
            [(lf_group, basename, output_prefix, number_of_bits) for lf_group, basename in zip(lf_groups, basenames)],
        )

        pool.starmap(
            __merge_file,
            [(output_prefix / f"id_part={id_part}", basenames, append) for id_part in range(pow(2, number_of_bits))],
        )

variants

Function relate to vcf structuration.

Functions:

  • merge

    Perform merge of multiple parquet variants file in one file.

merge

merge(
    paths: list[Path],
    output_prefix: Path,
    memory_limit: int = 10000000000,
    polars_threads: int = 4,
    *,
    append: bool,
) -> None

Perform merge of multiple parquet variants file in one file.

These function generate temporary file, by default file are written in /tmp but you can control where these files are written by set TMPDIR, TEMP or TMP directory.

Parameters:

  • paths (list[Path]) –

    List of file you want chunked.

  • output

    Path where variants is written.

  • memory_limit (int, default: 10000000000 ) –

    Size of each chunk in bytes.

Returns:

  • None

    None

Source code in src/variantplaner/struct/variants.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def merge(
    paths: list[pathlib.Path],
    output_prefix: pathlib.Path,
    memory_limit: int = 10_000_000_000,
    polars_threads: int = 4,
    *,
    append: bool,
) -> None:
    """Perform merge of multiple parquet variants file in one file.

    These function generate temporary file, by default file are written in `/tmp` but you can control where these files are written by set TMPDIR, TEMP or TMP directory.

    Args:
        paths: List of file you want chunked.
        output: Path where variants is written.
        memory_limit: Size of each chunk in bytes.

    Returns:
        None
    """
    all_threads = int(os.environ["POLARS_MAX_THREADS"])
    multi_threads = max(all_threads // polars_threads, 1)
    os.environ["POLARS_MAX_THREADS"] = str(polars_threads)
    output_prefix.mkdir(parents=True, exist_ok=True)

    temp_prefix = pathlib.Path(tempfile.gettempdir()) / "variantplaner" / variantplaner.any2string(output_prefix)
    temp_prefix.mkdir(parents=True, exist_ok=True)

    # merge file -> split by chromosome perform unique
    logger.debug("Start split first file")
    base_inputs_outputs: list[tuple[list[pathlib.Path], pathlib.Path]] = []
    for input_chunk in __chunk_by_memory(paths, bytes_limit=memory_limit):
        local_out_prefix = temp_prefix / variantplaner.any2string(input_chunk)
        local_out_prefix.mkdir(parents=True, exist_ok=True)

        base_inputs_outputs.append((input_chunk, local_out_prefix))

    with multiprocessing.get_context("spawn").Pool(multi_threads) as pool:
        chr_names = set().union(*pool.starmap(__merge_split_unique, base_inputs_outputs))
    logger.debug("End split first first file")

    if append and output_prefix.exists():
        chr_names |= {entry.name.split(".")[0] for entry in os.scandir(output_prefix) if entry.is_file()}

    # iterate over chromosme
    logger.debug("Start merge by chromosome")
    for chr_name in chr_names:
        logger.debug(f"start chromosome: {chr_name}")

        chr_temp_prefix = temp_prefix / chr_name
        chr_temp_prefix.mkdir(parents=True, exist_ok=True)

        inputs = [
            path / f"{chr_name}.parquet"
            for (_, path) in base_inputs_outputs
            if (path / f"{chr_name}.parquet").is_file()
        ]

        if append and (output_prefix / f"{chr_name}.parquet").exists():
            inputs.append(output_prefix / f"{chr_name}.parquet")

        if not inputs:
            continue

        while len(inputs) > 1:
            new_inputs = []

            inputs_outputs = []
            for input_chunk in __chunk_by_memory(inputs, bytes_limit=memory_limit):
                logger.debug(f"{input_chunk}")
                if len(input_chunk) == 1:
                    new_inputs.append(input_chunk[0])
                elif len(input_chunk) > 1:
                    temp_output = chr_temp_prefix / variantplaner.any2string(input_chunk) / f"{chr_name}.parquet"
                    temp_output.parent.mkdir(parents=True, exist_ok=True)

                    new_inputs.append(temp_output)
                    inputs_outputs.append((input_chunk, temp_output))

            inputs = new_inputs

            with multiprocessing.get_context("spawn").Pool(multi_threads) as pool:
                pool.starmap(__merge_unique, inputs_outputs)

        shutil.move(inputs[0], output_prefix / f"{chr_name}.parquet")
        logger.debug(f"end chromosome: {chr_name}")
    logger.debug("End merge by chromosome")

    # Call cleanup to remove all tempfile generate durring merging
    logger.debug("Star clean tmp file")
    shutil.rmtree(temp_prefix, ignore_errors=True)
    logger.debug("End clean tmp file")