Module cstag.call
Expand source code
from __future__ import annotations
from cstag.shorten import shorten
###########################################################
# Trim soft and hard clips from the CIGAR and sequence
###########################################################
def parse_cigar(cigar: str) -> list[tuple[str, int]]:
"""
Parse a CIGAR string into a list of tuples containing operation and length.
"""
parsed_cigar = []
start_idx = 0
for idx, operation in enumerate(cigar):
if operation.isdigit():
continue
length = int(cigar[start_idx:idx])
parsed_cigar.append((operation, length))
start_idx = idx + 1
return parsed_cigar
def parse_md(md: str) -> list[tuple[str, int]]:
"""
Parse an MD tag into a list of tuples containing operation and length.
"""
parsed_md = []
idx = 0
while idx < len(md):
if md[idx].isdigit():
start = idx
while idx < len(md) and md[idx].isdigit():
idx += 1
parsed_md.append(("=", int(md[start:idx])))
elif md[idx] == "^": # Deletion
start = idx
idx += 1
while idx < len(md) and not md[idx].isdigit():
idx += 1
parsed_md.append((md[start:idx], idx - start - 1))
else: # Mismatch
parsed_md.append((md[idx], 1))
idx += 1
return parsed_md
def join_cigar(cigar_tuples: list[tuple[str, int]]) -> str:
return "".join(f"{length}{operation}" for operation, length in cigar_tuples)
def trim_clips(cigar: str, seq: str) -> tuple[str, str]:
if all(x not in cigar for x in "SH"):
return cigar, seq
parsed_cigar = parse_cigar(cigar)
# trim soft clips of cigar and seq
if parsed_cigar[0][0] == "S":
length_softclip = parsed_cigar[0][1]
seq = seq[length_softclip:]
parsed_cigar = parsed_cigar[1:]
if parsed_cigar[-1][0] == "S":
length_softclip = parsed_cigar[-1][1]
seq = seq[:-length_softclip]
parsed_cigar = parsed_cigar[:-1]
# trim hard clips of cigar
if parsed_cigar[0][0] == "H":
parsed_cigar = parsed_cigar[1:]
if parsed_cigar[-1][0] == "H":
parsed_cigar = parsed_cigar[:-1]
cigar = join_cigar(parsed_cigar)
return cigar, seq
###########################################################
# Generate cs tag in long format
###########################################################
def expand_cigar_operations(cigar: str) -> list[str]:
parsed_cigar = parse_cigar(cigar)
expanded_list = []
for op, num in parsed_cigar:
if op in ["D", "N", "I"]:
expanded_list.append(op * num)
else:
expanded_list.extend([op] * num)
return expanded_list
def expand_md_operations(md: str) -> list[str]:
parsed_md = parse_md(md)
expanded_list = []
for op, num in parsed_md:
if op.startswith("^"):
expanded_list.append(op)
continue
expanded_list.extend([op] * num)
return expanded_list
def remove_consecutive_equals(cs_long: list[str]) -> list[str]:
if not cs_long:
return cs_long
for i, cs in enumerate(cs_long):
if i == 0:
prev_op = cs[0]
continue
curr_op = cs[0]
if prev_op == curr_op == "=":
cs_long[i] = cs[1:]
else:
prev_op = cs[0]
return cs_long
def generate_cs_long(cigar: str, md: str, seq: str) -> str:
cigar_list = expand_cigar_operations(cigar)
md_list = expand_md_operations(md)
idx_cigar, idx_md, idx_seq = 0, 0, 0
cs_long = []
while idx_seq < len(seq) and idx_cigar < len(cigar_list) and idx_md < len(md_list):
if cigar_list[idx_cigar] == "M":
if md_list[idx_md] == "=":
cs_long.append(f"={seq[idx_seq]}")
else:
cs_long.append(f"*{md_list[idx_md]}{seq[idx_seq]}".lower())
idx_cigar += 1
idx_md += 1
idx_seq += 1
continue
if cigar_list[idx_cigar][0] == "D":
cs_long.append(md_list[idx_md].replace("^", "-").lower())
idx_cigar += 1
idx_md += 1
continue
if cigar_list[idx_cigar][0] == "I":
num_insertion = len(cigar_list[idx_cigar])
seq_insertion = "".join(seq[idx_seq : idx_seq + num_insertion])
cs_long.append(f"+{seq_insertion}".lower())
idx_cigar += 1
idx_seq += num_insertion
continue
if cigar_list[idx_cigar][0] == "N":
cs_long.append(f"~nn{len(cigar_list[idx_cigar])}nn")
idx_cigar += 1
continue
return "".join(remove_consecutive_equals(cs_long))
def add_prefix(cs_tag: str) -> str:
return f"cs:Z:{cs_tag}"
###########################################################
# main
###########################################################
def call(cigar: str, md: str, seq: str, long: bool = False, prefix: bool = False) -> str:
"""
Generate a cs tag based on CIGAR, MD, and SEQ information.
Args:
cigar (str): CIGAR string representing the alignment.
md (str): MD tag representing mismatching positions/base.
seq (str): The sequence of the read.
long (bool, optional): Whether to return the cs tag in long format. Defaults to False.
prefix (bool, optional): Whether to add the prefix 'cs:Z:' to the cs tag. Defaults to False
Returns:
str: A cs tag representing the alignment and differences.
Example:
>>> import cstag
>>> cigar = "8M2D4M2I3N1M"
>>> md = "2A5^AG7"
>>> seq = "ACGTACGTACGTACG"
>>> cstag.call(cigar, md, seq, long=True)
'=AC*ag=TACGT-ag=ACGT+ac~nn3nn=G'
"""
cigar, seq = trim_clips(cigar, seq)
cs_tag = generate_cs_long(cigar, md, seq)
if long is False:
cs_tag = shorten(cs_tag)
if prefix is True:
cs_tag = add_prefix(cs_tag)
return cs_tag
Functions
def add_prefix(cs_tag: str) ‑> str
-
Expand source code
def add_prefix(cs_tag: str) -> str: return f"cs:Z:{cs_tag}"
def call(cigar: str, md: str, seq: str, long: bool = False, prefix: bool = False) ‑> str
-
Generate a cs tag based on CIGAR, MD, and SEQ information.
Args
cigar
:str
- CIGAR string representing the alignment.
md
:str
- MD tag representing mismatching positions/base.
seq
:str
- The sequence of the read.
long
:bool
, optional- Whether to return the cs tag in long format. Defaults to False.
prefix
:bool
, optional- Whether to add the prefix 'cs:Z:' to the cs tag. Defaults to False
Returns
str
- A cs tag representing the alignment and differences.
Example
>>> import cstag >>> cigar = "8M2D4M2I3N1M" >>> md = "2A5^AG7" >>> seq = "ACGTACGTACGTACG" >>> cstag.call(cigar, md, seq, long=True) '=AC*ag=TACGT-ag=ACGT+ac~nn3nn=G'
Expand source code
def call(cigar: str, md: str, seq: str, long: bool = False, prefix: bool = False) -> str: """ Generate a cs tag based on CIGAR, MD, and SEQ information. Args: cigar (str): CIGAR string representing the alignment. md (str): MD tag representing mismatching positions/base. seq (str): The sequence of the read. long (bool, optional): Whether to return the cs tag in long format. Defaults to False. prefix (bool, optional): Whether to add the prefix 'cs:Z:' to the cs tag. Defaults to False Returns: str: A cs tag representing the alignment and differences. Example: >>> import cstag >>> cigar = "8M2D4M2I3N1M" >>> md = "2A5^AG7" >>> seq = "ACGTACGTACGTACG" >>> cstag.call(cigar, md, seq, long=True) '=AC*ag=TACGT-ag=ACGT+ac~nn3nn=G' """ cigar, seq = trim_clips(cigar, seq) cs_tag = generate_cs_long(cigar, md, seq) if long is False: cs_tag = shorten(cs_tag) if prefix is True: cs_tag = add_prefix(cs_tag) return cs_tag
def expand_cigar_operations(cigar: str) ‑> list[str]
-
Expand source code
def expand_cigar_operations(cigar: str) -> list[str]: parsed_cigar = parse_cigar(cigar) expanded_list = [] for op, num in parsed_cigar: if op in ["D", "N", "I"]: expanded_list.append(op * num) else: expanded_list.extend([op] * num) return expanded_list
def expand_md_operations(md: str) ‑> list[str]
-
Expand source code
def expand_md_operations(md: str) -> list[str]: parsed_md = parse_md(md) expanded_list = [] for op, num in parsed_md: if op.startswith("^"): expanded_list.append(op) continue expanded_list.extend([op] * num) return expanded_list
def generate_cs_long(cigar: str, md: str, seq: str) ‑> str
-
Expand source code
def generate_cs_long(cigar: str, md: str, seq: str) -> str: cigar_list = expand_cigar_operations(cigar) md_list = expand_md_operations(md) idx_cigar, idx_md, idx_seq = 0, 0, 0 cs_long = [] while idx_seq < len(seq) and idx_cigar < len(cigar_list) and idx_md < len(md_list): if cigar_list[idx_cigar] == "M": if md_list[idx_md] == "=": cs_long.append(f"={seq[idx_seq]}") else: cs_long.append(f"*{md_list[idx_md]}{seq[idx_seq]}".lower()) idx_cigar += 1 idx_md += 1 idx_seq += 1 continue if cigar_list[idx_cigar][0] == "D": cs_long.append(md_list[idx_md].replace("^", "-").lower()) idx_cigar += 1 idx_md += 1 continue if cigar_list[idx_cigar][0] == "I": num_insertion = len(cigar_list[idx_cigar]) seq_insertion = "".join(seq[idx_seq : idx_seq + num_insertion]) cs_long.append(f"+{seq_insertion}".lower()) idx_cigar += 1 idx_seq += num_insertion continue if cigar_list[idx_cigar][0] == "N": cs_long.append(f"~nn{len(cigar_list[idx_cigar])}nn") idx_cigar += 1 continue return "".join(remove_consecutive_equals(cs_long))
def join_cigar(cigar_tuples: list[tuple[str, int]]) ‑> str
-
Expand source code
def join_cigar(cigar_tuples: list[tuple[str, int]]) -> str: return "".join(f"{length}{operation}" for operation, length in cigar_tuples)
def parse_cigar(cigar: str) ‑> list[tuple[str, int]]
-
Parse a CIGAR string into a list of tuples containing operation and length.
Expand source code
def parse_cigar(cigar: str) -> list[tuple[str, int]]: """ Parse a CIGAR string into a list of tuples containing operation and length. """ parsed_cigar = [] start_idx = 0 for idx, operation in enumerate(cigar): if operation.isdigit(): continue length = int(cigar[start_idx:idx]) parsed_cigar.append((operation, length)) start_idx = idx + 1 return parsed_cigar
def parse_md(md: str) ‑> list[tuple[str, int]]
-
Parse an MD tag into a list of tuples containing operation and length.
Expand source code
def parse_md(md: str) -> list[tuple[str, int]]: """ Parse an MD tag into a list of tuples containing operation and length. """ parsed_md = [] idx = 0 while idx < len(md): if md[idx].isdigit(): start = idx while idx < len(md) and md[idx].isdigit(): idx += 1 parsed_md.append(("=", int(md[start:idx]))) elif md[idx] == "^": # Deletion start = idx idx += 1 while idx < len(md) and not md[idx].isdigit(): idx += 1 parsed_md.append((md[start:idx], idx - start - 1)) else: # Mismatch parsed_md.append((md[idx], 1)) idx += 1 return parsed_md
def remove_consecutive_equals(cs_long: list[str]) ‑> list[str]
-
Expand source code
def remove_consecutive_equals(cs_long: list[str]) -> list[str]: if not cs_long: return cs_long for i, cs in enumerate(cs_long): if i == 0: prev_op = cs[0] continue curr_op = cs[0] if prev_op == curr_op == "=": cs_long[i] = cs[1:] else: prev_op = cs[0] return cs_long
def trim_clips(cigar: str, seq: str) ‑> tuple[str, str]
-
Expand source code
def trim_clips(cigar: str, seq: str) -> tuple[str, str]: if all(x not in cigar for x in "SH"): return cigar, seq parsed_cigar = parse_cigar(cigar) # trim soft clips of cigar and seq if parsed_cigar[0][0] == "S": length_softclip = parsed_cigar[0][1] seq = seq[length_softclip:] parsed_cigar = parsed_cigar[1:] if parsed_cigar[-1][0] == "S": length_softclip = parsed_cigar[-1][1] seq = seq[:-length_softclip] parsed_cigar = parsed_cigar[:-1] # trim hard clips of cigar if parsed_cigar[0][0] == "H": parsed_cigar = parsed_cigar[1:] if parsed_cigar[-1][0] == "H": parsed_cigar = parsed_cigar[:-1] cigar = join_cigar(parsed_cigar) return cigar, seq