Documentation for the utils modules

scprint.utils.sinkhorn

SinkhornDistance

Bases: Module

SinkhornDistance Initialize the SinkhornDistance class

Parameters:
  • eps (float, default: 0.01 ) –

    Regularization parameter. Defaults to 1e-2.

  • max_iter (int, default: 100 ) –

    Maximum number of Sinkhorn iterations. Defaults to 100.

  • reduction (str, default: 'none' ) –

    Specifies the reduction to apply to the output. Defaults to "none".

Source code in scprint/utils/sinkhorn.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def __init__(self, eps: float = 1e-2, max_iter: int = 100, reduction: str = "none"):
    """
    SinkhornDistance Initialize the SinkhornDistance class

    Args:
        eps (float, optional): Regularization parameter. Defaults to 1e-2.
        max_iter (int, optional): Maximum number of Sinkhorn iterations. Defaults to 100.
        reduction (str, optional): Specifies the reduction to apply to the output. Defaults to "none".
    """
    super(SinkhornDistance, self).__init__()
    self.eps = eps
    self.max_iter = max_iter
    self.reduction = reduction

M

Modified cost for logarithmic updates

Source code in scprint/utils/sinkhorn.py
 99
100
101
102
def M(self, C, u, v):
    "Modified cost for logarithmic updates"
    """$M_{ij} = (-c_{ij} + u_i + v_j) / epsilon$"""
    return (-C + u.unsqueeze(-1) + v.unsqueeze(1)) / self.eps

ave staticmethod

Barycenter subroutine, used by kinetic acceleration through extrapolation.

Source code in scprint/utils/sinkhorn.py
104
105
106
107
@staticmethod
def ave(u, u1, tau):
    "Barycenter subroutine, used by kinetic acceleration through extrapolation."
    return tau * u + (1 - tau) * u1

forward

forward Compute the Sinkhorn distance between two measures with cost matrix c

Parameters:
  • c (Tensor) –

    The cost matrix between the two measures.

Returns:
  • torch.Tensor: The computed Sinkhorn distance.

Source code in scprint/utils/sinkhorn.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def forward(self, c: torch.Tensor):
    """
    forward Compute the Sinkhorn distance between two measures with cost matrix c

    Args:
        c (torch.Tensor): The cost matrix between the two measures.

    Returns:
        torch.Tensor: The computed Sinkhorn distance.
    """
    C = -c
    x_points = C.shape[-2]
    batch_size = C.shape[0]

    # both marginals are fixed with equal weights
    mu = (
        torch.empty(
            batch_size,
            x_points,
            dtype=C.dtype,
            requires_grad=False,
            device=C.device,
        )
        .fill_(1.0 / x_points)
        .squeeze()
    )
    nu = (
        torch.empty(
            batch_size,
            x_points,
            dtype=C.dtype,
            requires_grad=False,
            device=C.device,
        )
        .fill_(1.0 / x_points)
        .squeeze()
    )
    u = torch.zeros_like(mu)
    v = torch.zeros_like(nu)

    # Stopping criterion
    thresh = 1e-12

    # Sinkhorn iterations
    for i in range(self.max_iter):
        if i % 2 == 0:
            u1 = u  # useful to check the update
            u = (
                self.eps
                * (torch.log(mu) - torch.logsumexp(self.M(C, u, v), dim=-1))
                + u
            )
            err = (u - u1).abs().sum(-1).mean()
        else:
            v = (
                self.eps
                * (
                    torch.log(nu)
                    - torch.logsumexp(self.M(C, u, v).transpose(-2, -1), dim=-1)
                )
                + v
            )
            v = v.detach().requires_grad_(False)
            v[v > 9 * 1e8] = 0.0
            v = v.detach().requires_grad_(True)

        if err.item() < thresh:
            break

    U, V = u, v
    # Transport plan pi = diag(a)*K*diag(b)
    pi = torch.exp(self.M(C, U, V))

    # Sinkhorn distance

    return pi, C, U, V

scprint.utils.utils

category_str2int

category_str2int converts a list of category strings to a list of category integers.

Parameters:
  • category_strs (List[str]) –

    A list of category strings to be converted.

Returns:
  • List[int]

    List[int]: A list of integers corresponding to the input category strings.

Source code in scprint/utils/utils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def category_str2int(category_strs: List[str]) -> List[int]:
    """
    category_str2int converts a list of category strings to a list of category integers.

    Args:
        category_strs (List[str]): A list of category strings to be converted.

    Returns:
        List[int]: A list of integers corresponding to the input category strings.
    """
    set_category_strs = set(category_strs)
    name2id = {name: i for i, name in enumerate(set_category_strs)}
    return [name2id[name] for name in category_strs]

createFoldersFor

will recursively create folders if needed until having all the folders required to save the file in this filepath

Source code in scprint/utils/utils.py
85
86
87
88
89
90
91
92
93
def createFoldersFor(filepath):
    """
    will recursively create folders if needed until having all the folders required to save the file in this filepath
    """
    prevval = ""
    for val in os.path.expanduser(filepath).split("/")[:-1]:
        prevval += val + "/"
        if not os.path.exists(prevval):
            os.mkdir(prevval)

fileToList

loads an input file with a\n b\n.. into a list [a,b,..]

Parameters:
  • input_str (str) –

    The input string to be completed.

Returns:
  • str( list ) –

    The completed string with 'complete' appended.

Source code in scprint/utils/utils.py
43
44
45
46
47
48
49
50
51
52
53
54
def fileToList(filename: str, strconv: callable = lambda x: x) -> list:
    """
    loads an input file with a\\n b\\n.. into a list [a,b,..]

    Args:
        input_str (str): The input string to be completed.

    Returns:
        str: The completed string with 'complete' appended.
    """
    with open(filename) as f:
        return [strconv(val[:-1]) for val in f.readlines()]

get_free_gpu

get_free_gpu finds the GPU with the most free memory using nvidia-smi.

Returns:
  • int

    The index of the GPU with the most free memory.

Source code in scprint/utils/utils.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def get_free_gpu():
    """
    get_free_gpu finds the GPU with the most free memory using nvidia-smi.

    Returns:
        int: The index of the GPU with the most free memory.
    """
    import subprocess
    import sys
    from io import StringIO

    import pandas as pd

    gpu_stats = subprocess.check_output(
        [
            "nvidia-smi",
            "--format=csv",
            "--query-gpu=memory.used,memory.free",
        ]
    ).decode("utf-8")
    gpu_df = pd.read_csv(
        StringIO(gpu_stats), names=["memory.used", "memory.free"], skiprows=1
    )
    print("GPU usage:\n{}".format(gpu_df))
    gpu_df["memory.free"] = gpu_df["memory.free"].map(lambda x: int(x.rstrip(" [MiB]")))
    idx = gpu_df["memory.free"].idxmax()
    print(
        "Find free GPU{} with {} free MiB".format(idx, gpu_df.iloc[idx]["memory.free"])
    )

    return idx

get_git_commit

get_git_commit gets the current git commit hash.

Returns:
  • str

    The current git commit

Source code in scprint/utils/utils.py
194
195
196
197
198
199
200
201
def get_git_commit():
    """
    get_git_commit gets the current git commit hash.

    Returns:
        str: The current git commit
    """
    return subprocess.check_output(["git", "rev-parse", "HEAD"]).decode("utf-8").strip()

inf_loop

wrapper function for endless data loader.

Source code in scprint/utils/utils.py
222
223
224
225
def inf_loop(data_loader):
    """wrapper function for endless data loader."""
    for loader in repeat(data_loader):
        yield from loader

isnotebook

check whether excuting in jupyter notebook.

Source code in scprint/utils/utils.py
111
112
113
114
115
116
117
118
119
120
121
122
def isnotebook() -> bool:
    """check whether excuting in jupyter notebook."""
    try:
        shell = get_ipython().__class__.__name__
        if shell == "ZMQInteractiveShell":
            return True  # Jupyter notebook or qtconsole
        elif shell == "TerminalInteractiveShell":
            return True  # Terminal running IPython
        else:
            return False  # Other type (?)
    except NameError:
        return False  # Probably standard Python interpreter

listToFile

listToFile loads a list with [a,b,..] into an input file a\n b\n..

Parameters:
  • l (list) –

    The list of elements to be written to the file.

  • filename (str) –

    The name of the file where the list will be written.

  • strconv (callable, default: lambda x: str(x) ) –

    A function to convert each element of the list to a string. Defaults to str.

Returns:
  • None

    None

Source code in scprint/utils/utils.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def listToFile(li: list, filename: str, strconv: callable = lambda x: str(x)) -> None:
    """
    listToFile loads a list with [a,b,..] into an input file a\\n b\\n..

    Args:
        l (list): The list of elements to be written to the file.
        filename (str): The name of the file where the list will be written.
        strconv (callable, optional): A function to convert each element of the list to a string. Defaults to str.

    Returns:
        None
    """
    with open(filename, "w") as f:
        for item in li:
            f.write("%s\n" % strconv(item))

load_genes

load_genes loads the genes for a given organism.

Parameters:
  • organisms (Union[str, list], default: 'NCBITaxon:9606' ) –

    A string or list of strings representing the organism(s) to load genes for. Defaults to "NCBITaxon:9606".

Returns:
  • pd.DataFrame: A DataFrame containing gene information for the specified organism(s).

Source code in scprint/utils/utils.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def load_genes(organisms: Union[str, list] = "NCBITaxon:9606"):  # "NCBITaxon:10090",
    """
    load_genes loads the genes for a given organism.

    Args:
        organisms (Union[str, list], optional): A string or list of strings representing the organism(s) to load genes for. Defaults to "NCBITaxon:9606".

    Returns:
        pd.DataFrame: A DataFrame containing gene information for the specified organism(s).
    """
    organismdf = []
    if type(organisms) is str:
        organisms = [organisms]
    for organism in organisms:
        genesdf = bt.Gene.filter(
            organism_id=bt.Organism.filter(ontology_id=organism).first().id
        ).df()
        genesdf = genesdf[~genesdf["public_source_id"].isna()]
        genesdf = genesdf.drop_duplicates(subset="ensembl_gene_id")
        genesdf = genesdf.set_index("ensembl_gene_id").sort_index()
        # mitochondrial genes
        genesdf["mt"] = genesdf.symbol.astype(str).str.startswith("MT-")
        # ribosomal genes
        genesdf["ribo"] = genesdf.symbol.astype(str).str.startswith(("RPS", "RPL"))
        # hemoglobin genes.
        genesdf["hb"] = genesdf.symbol.astype(str).str.contains(("^HB[^(P)]"))
        genesdf["organism"] = organism
        organismdf.append(genesdf)
    organismdf = pd.concat(organismdf)
    organismdf.drop(
        columns=["source_id", "stable_id", "run_id", "created_by_id", "updated_at"],
        inplace=True,
    )
    return organismdf

prepare_device

setup GPU device if available. get gpu device indices which are used for DataParallel

Source code in scprint/utils/utils.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def prepare_device(n_gpu_use):
    """
    setup GPU device if available. get gpu device indices which are used for DataParallel
    """
    n_gpu = torch.cuda.device_count()
    if n_gpu_use > 0 and n_gpu == 0:
        print(
            "Warning: There's no GPU available on this machine,"
            "training will be performed on CPU."
        )
        n_gpu_use = 0
    if n_gpu_use > n_gpu:
        print(
            f"Warning: The number of GPU's configured to use is {n_gpu_use}, but only {n_gpu} are "
            "available on this machine."
        )
        n_gpu_use = n_gpu
    device = torch.device("cuda:0" if n_gpu_use > 0 else "cpu")
    list_ids = list(range(n_gpu_use))
    return device, list_ids

run_command

run_command runs a command in the shell and prints the output.

Parameters:
  • command (str) –

    The command to be executed in the shell.

Returns:
  • int

    The return code of the command executed.

Source code in scprint/utils/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def run_command(command: str, **kwargs):
    """
    run_command runs a command in the shell and prints the output.

    Args:
        command (str): The command to be executed in the shell.

    Returns:
        int: The return code of the command executed.
    """
    process = subprocess.Popen(command, stdout=subprocess.PIPE, **kwargs)
    while True:
        if process.poll() is not None:
            break
        output = process.stdout.readline()
        if output:
            print(output.strip())
    rc = process.poll()
    return rc

set_seed

set random seed.

Source code in scprint/utils/utils.py
74
75
76
77
78
79
80
def set_seed(seed: int = 42):
    """set random seed."""
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

scprint.utils.get_seq

load_fasta_species

Downloads and caches FASTA files for a given species from the Ensembl FTP server.

Parameters:
  • species (str, default: 'homo_sapiens' ) –

    The species name for which to download FASTA files. Defaults to "homo_sapiens".

  • output_path (str, default: '/tmp/data/fasta/' ) –

    The local directory path where the FASTA files will be saved. Defaults to "/tmp/data/fasta/".

  • cache (bool, default: True ) –

    If True, use cached files if they exist. If False, re-download the files. Defaults to True.

Source code in scprint/utils/get_seq.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def load_fasta_species(
    species: str = "homo_sapiens",
    output_path: str = "/tmp/data/fasta/",
    cache: bool = True,
) -> None:
    """
    Downloads and caches FASTA files for a given species from the Ensembl FTP server.

    Args:
        species (str, optional): The species name for which to download FASTA files. Defaults to "homo_sapiens".
        output_path (str, optional): The local directory path where the FASTA files will be saved. Defaults to "/tmp/data/fasta/".
        cache (bool, optional): If True, use cached files if they exist. If False, re-download the files. Defaults to True.
    """
    ftp = ftplib.FTP("ftp.ensembl.org")
    ftp.login()
    ftp.cwd("/pub/release-110/fasta/" + species + "/pep/")
    file = list_files(ftp, ".all.fa.gz")[0]
    local_file_path = output_path + file
    if not os.path.exists(local_file_path) or not cache:
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        with open(local_file_path, "wb") as local_file:
            ftp.retrbinary("RETR " + file, local_file.write)
    ftp.cwd("/pub/release-110/fasta/" + species + "/ncrna/")
    file = list_files(ftp, ".ncrna.fa.gz")[0]
    local_file_path = output_path + file
    if not os.path.exists(local_file_path) or not cache:
        with open(local_file_path, "wb") as local_file:
            ftp.retrbinary("RETR " + file, local_file.write)
    ftp.quit()

seq

Fetch nucleotide or amino acid sequence (FASTA) of a gene (and all its isoforms) or transcript by Ensembl, WormBase, or FlyBase ID.

Parameters:
  • ens_ids (Union[str, List[str]]) –

    One or more Ensembl IDs (passed as string or list of strings). Also supports WormBase and FlyBase IDs.

  • translate (bool, default: False ) –

    Defines whether nucleotide or amino acid sequences are returned. Defaults to False (returns nucleotide sequences). Nucleotide sequences are fetched from the Ensembl REST API server. Amino acid sequences are fetched from the UniProt REST API server.

  • isoforms (bool, default: False ) –

    If True, returns the sequences of all known transcripts. Defaults to False. (Only for gene IDs.)

  • parallel (bool, default: True ) –

    If True, fetches sequences in parallel. Defaults to True.

  • save (bool, default: False ) –

    If True, saves output FASTA to current directory. Defaults to False.

  • transcribe (bool, default: None ) –

    Deprecated. Use 'translate' instead.

  • seqtype (str, default: None ) –

    Deprecated. Use 'translate' instead.

  • verbose (bool, default: True ) –

    If True, prints progress information. Defaults to True.

Returns:
  • List[str]

    List[str]: A list containing the requested sequences, or a FASTA file if 'save' is True.

Raises:
  • ValueError

    If an invalid Ensembl ID is provided.

Source code in scprint/utils/get_seq.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def seq(
    ens_ids: Union[str, List[str]],
    translate: bool = False,
    isoforms: bool = False,
    parallel: bool = True,
    save: bool = False,
    transcribe: Optional[bool] = None,
    seqtype: Optional[str] = None,
    verbose: bool = True,
) -> List[str]:
    """
    Fetch nucleotide or amino acid sequence (FASTA) of a gene (and all its isoforms) or transcript by Ensembl, WormBase, or FlyBase ID.

    Args:
        ens_ids (Union[str, List[str]]): One or more Ensembl IDs (passed as string or list of strings).
                                         Also supports WormBase and FlyBase IDs.
        translate (bool, optional): Defines whether nucleotide or amino acid sequences are returned.
                                    Defaults to False (returns nucleotide sequences).
                                    Nucleotide sequences are fetched from the Ensembl REST API server.
                                    Amino acid sequences are fetched from the UniProt REST API server.
        isoforms (bool, optional): If True, returns the sequences of all known transcripts. Defaults to False.
                                   (Only for gene IDs.)
        parallel (bool, optional): If True, fetches sequences in parallel. Defaults to True.
        save (bool, optional): If True, saves output FASTA to current directory. Defaults to False.
        transcribe (bool, optional): Deprecated. Use 'translate' instead.
        seqtype (str, optional): Deprecated. Use 'translate' instead.
        verbose (bool, optional): If True, prints progress information. Defaults to True.

    Returns:
        List[str]: A list containing the requested sequences, or a FASTA file if 'save' is True.

    Raises:
        ValueError: If an invalid Ensembl ID is provided.
    """
    # Handle deprecated arguments
    if seqtype:
        logging.error(
            "'seqtype' argument deprecated! Please use True/False argument 'translate' instead."
        )
        return
    if transcribe:
        translate = transcribe

    ## Clean up arguments
    # Clean up Ensembl IDs
    # If single Ensembl ID passed as string, convert to list
    if type(ens_ids) is str:
        ens_ids = [ens_ids]
    # Remove Ensembl ID version if passed
    ens_ids_clean = []
    temp = 0
    for ensembl_ID in ens_ids:
        # But only for Ensembl ID (and not for flybase/wormbase IDs)
        if ensembl_ID.startswith("ENS"):
            ens_ids_clean.append(ensembl_ID.split(".")[0])

            if "." in ensembl_ID and temp == 0:
                if verbose:
                    logging.info(
                        "We noticed that you may have passed a version number with your Ensembl ID.\n"
                        "Please note that gget seq will return information linked to the latest Ensembl ID version."
                    )
                temp = +1

        else:
            ens_ids_clean.append(ensembl_ID)

    # Initiate empty 'fasta'
    fasta = []

    ## Fetch nucleotide sequece
    if translate is False:
        # Define Ensembl REST API server
        server = ENSEMBL_REST_API
        # Define type of returned content from REST
        content_type = "application/json"

        # Initiate dictionary to save results for all IDs in
        master_dict = {}

        # Query REST APIs from https://rest.ensembl.org/
        for ensembl_ID in ens_ids_clean:
            # Create dict to save query results
            results_dict = {ensembl_ID: {}}

            # If isoforms False, just fetch sequences of passed Ensembl ID
            if isoforms is False:
                # sequence/id/ query: Request sequence by stable identifier
                query = "sequence/id/" + ensembl_ID + "?"

                # Try if query valid
                try:
                    # Submit query; this will throw RuntimeError if ID not found
                    df_temp = rest_query(server, query, content_type)

                    # Delete superfluous entries
                    keys_to_delete = ["query", "id", "version", "molecule"]
                    for key in keys_to_delete:
                        # Pop keys, None -> do not raise an error if key to delete not found
                        df_temp.pop(key, None)

                    # Add results to main dict
                    results_dict[ensembl_ID].update({"seq": df_temp})

                    if verbose:
                        logging.info(
                            f"Requesting nucleotide sequence of {ensembl_ID} from Ensembl."
                        )

                except RuntimeError:
                    logging.error(
                        f"ID {ensembl_ID} not found. Please double-check spelling/arguments and try again."
                    )

            # If isoforms true, fetch sequences of isoforms instead
            if isoforms is True:
                # Get ID type (gene, transcript, ...) using gget info
                info_df = info(
                    ensembl_ID, verbose=False, pdb=False, ncbi=False, uniprot=False
                )

                # Check if Ensembl ID was found
                if isinstance(info_df, type(None)):
                    logging.warning(
                        f"ID '{ensembl_ID}' not found. Please double-check spelling/arguments and try again."
                    )
                    continue

                ens_ID_type = info_df.loc[ensembl_ID]["object_type"]

                # If the ID is a gene, get the IDs of all its transcripts
                if ens_ID_type == "Gene":
                    if verbose:
                        logging.info(
                            f"Requesting nucleotide sequences of all transcripts of {ensembl_ID} from Ensembl."
                        )

                    for transcipt_id in info_df.loc[ensembl_ID]["all_transcripts"]:
                        # Remove version number for Ensembl IDs (not for flybase/wormbase IDs)
                        if transcipt_id.startswith("ENS"):
                            transcipt_id = transcipt_id.split(".")[0]

                        # Try if query is valid
                        try:
                            # Define the REST query
                            query = "sequence/id/" + transcipt_id + "?"
                            # Submit query
                            df_temp = rest_query(server, query, content_type)

                            # Delete superfluous entries
                            keys_to_delete = ["query", "version", "molecule"]
                            for key in keys_to_delete:
                                # Pop keys, None -> do not raise an error if key to delete not found
                                df_temp.pop(key, None)

                            # Add results to main dict
                            results_dict[ensembl_ID].update(
                                {f"{transcipt_id}": df_temp}
                            )

                        except RuntimeError:
                            logging.error(
                                f"ID {transcipt_id} not found. "
                                "Please double-check spelling/arguments and try again."
                            )

                # If isoform true, but ID is not a gene; ignore the isoform parameter
                else:
                    # Try if query is valid
                    try:
                        # Define the REST query
                        query = "sequence/id/" + ensembl_ID + "?"

                        # Submit query
                        df_temp = rest_query(server, query, content_type)

                        # Delete superfluous entries
                        keys_to_delete = ["query", "id", "version", "molecule"]
                        for key in keys_to_delete:
                            # Pop keys, None -> do not raise an error if key to delete not found
                            df_temp.pop(key, None)

                        # Add results to main dict
                        results_dict[ensembl_ID].update({"seq": df_temp})

                        logging.info(
                            f"Requesting nucleotide sequence of {ensembl_ID} from Ensembl."
                        )
                        logging.warning("The isoform option only applies to gene IDs.")

                    except RuntimeError:
                        logging.error(
                            f"ID {ensembl_ID} not found. "
                            "Please double-check spelling/arguments and try again."
                        )

            # Add results to master dict
            master_dict.update(results_dict)

        # Build FASTA file
        for ens_ID in master_dict:
            for key in master_dict[ens_ID].keys():
                if key == "seq":
                    fasta.append(">" + ens_ID + " " + master_dict[ens_ID][key]["desc"])
                    fasta.append(master_dict[ens_ID][key]["seq"])
                else:
                    fasta.append(
                        ">"
                        + master_dict[ens_ID][key]["id"]
                        + " "
                        + master_dict[ens_ID][key]["desc"]
                    )
                    fasta.append(master_dict[ens_ID][key]["seq"])

    ## Fetch amino acid sequences from UniProt
    if translate is True:
        if isoforms is False:
            # List to collect transcript IDs
            trans_ids = []

            # Get ID type (gene, transcript, ...) using gget info
            info_df = info(
                ens_ids_clean, verbose=False, pdb=False, ncbi=False, uniprot=False
            )

            # Check that Ensembl ID was found
            missing = set(ens_ids_clean) - set(info_df.index.values)
            if len(missing) > 0:
                logging.warning(
                    f"{str(missing)} IDs not found. Please double-check spelling/arguments."
                )

            ens_ID_type = info_df.loc[ens_ids_clean[0]]["object_type"]

            # If the ID is a gene, use the ID of its canonical transcript
            if ens_ID_type == "Gene":
                # Get ID of canonical transcript
                for ensembl_ID in info_df.index.values:
                    can_trans = info_df.loc[ensembl_ID]["canonical_transcript"]

                    if ensembl_ID.startswith("ENS"):
                        # Remove Ensembl ID version from transcript IDs and append to transcript IDs list
                        temp_trans_id = can_trans.split(".")[0]
                        trans_ids.append(temp_trans_id)

                    elif ensembl_ID.startswith("WB"):
                        # Remove added "." at the end of transcript IDs
                        temp_trans_id = ".".join(can_trans.split(".")[:-1])
                        # # For WormBase transcript IDs, also remove the version number for submission to UniProt API
                        # temp_trans_id = ".".join(temp_trans_id1.split(".")[:-1])
                        trans_ids.append(temp_trans_id)

                    else:
                        # Remove added "." at the end of other transcript IDs
                        temp_trans_id = ".".join(can_trans.split(".")[:-1])
                        trans_ids.append(temp_trans_id)

                    if verbose:
                        logging.info(
                            f"Requesting amino acid sequence of the canonical transcript {temp_trans_id} of gene {ensembl_ID} from UniProt."
                        )

            # If the ID is a transcript, append the ID directly
            elif ens_ID_type == "Transcript":
                # # For WormBase transcript IDs, remove the version number for submission to UniProt API
                # if ensembl_ID.startswith("T"):
                #     trans_ids.append(".".join(ensembl_ID.split(".")[:-1]))
                # else:
                trans_ids = ens_ids_clean

                if verbose:
                    logging.info(
                        f"Requesting amino acid sequence of {trans_ids} from UniProt."
                    )

            else:
                logging.warning(
                    "ensembl_IDs not recognized as either a gene or transcript ID. It will not be included in the UniProt query."
                )

            # Fetch the amino acid sequences of the transcript Ensembl IDs
            df_uniprot = get_uniprot_seqs(UNIPROT_REST_API, trans_ids)
            # Add info_df.loc[ensembl_ID] to df_uniprot by joining on "canonical_transcript" / "gene_name" respectively
            import pdb

            pdb.set_trace()
            info_df.set_index("canonical_transcript", inplace=True)

            df_uniprot.loc[:, "gene_id"] = info_df.loc[
                df_uniprot["query"], "gene_name"
            ].values

        if isoforms is True:
            # List to collect transcript IDs
            trans_ids = []

            for ensembl_ID in ens_ids_clean:
                # Get ID type (gene, transcript, ...) using gget info
                info_df = info(
                    ensembl_ID, verbose=False, pdb=False, ncbi=False, uniprot=False
                )

                # Check that Ensembl ID was found
                if isinstance(info_df, type(None)):
                    logging.warning(
                        f"ID '{ensembl_ID}' not found. Please double-check spelling/arguments."
                    )
                    continue

                ens_ID_type = info_df.loc[ensembl_ID]["object_type"]

                # If the ID is a gene, get the IDs of all isoforms
                if ens_ID_type == "Gene":
                    # Get the IDs of all transcripts from the gget info results
                    for transcipt_id in info_df.loc[ensembl_ID]["all_transcripts"]:
                        if ensembl_ID.startswith("ENS"):
                            # Append transcript ID (without Ensembl version number) to list of transcripts to fetch
                            trans_ids.append(transcipt_id.split(".")[0])

                        # elif ensembl_ID.startswith("WB"):
                        #     # For WormBase transcript IDs, remove the version number for submission to UniProt API
                        #     temp_trans_id = ".".join(transcipt_id.split(".")[:-1])
                        #     trans_ids.append(temp_trans_id)

                        else:
                            # Note: No need to remove the added "." at the end of unversioned transcripts here, because "all_transcripts" are returned without it
                            trans_ids.append(transcipt_id)

                    if verbose:
                        logging.info(
                            f"Requesting amino acid sequences of all transcripts of gene {ensembl_ID} from UniProt."
                        )

                elif ens_ID_type == "Transcript":
                    # # For WormBase transcript IDs, remove the version number for submission to UniProt API
                    # if ensembl_ID.startswith("T"):
                    #     trans_ids.append(".".join(ensembl_ID.split(".")[:-1]))

                    # else:
                    trans_ids.append(ensembl_ID)

                    if verbose:
                        logging.info(
                            f"Requesting amino acid sequence of {ensembl_ID} from UniProt."
                        )
                    logging.warning("The isoform option only applies to gene IDs.")

                else:
                    logging.warning(
                        f"{ensembl_ID} not recognized as either a gene or transcript ID. It will not be included in the UniProt query."
                    )

            # Fetch amino acid sequences of all isoforms from the UniProt REST API
            df_uniprot = get_uniprot_seqs(UNIPROT_REST_API, trans_ids)

        # Check if any results were found
        if len(df_uniprot) < 1:
            logging.error("No UniProt amino acid sequences were found for these ID(s).")

        else:
            # Build FASTA file from UniProt results
            for (
                uniprot_id,
                query_ensembl_id,
                gene_name,
                organism,
                sequence_length,
                uniprot_seq,
            ) in zip(
                df_uniprot["uniprot_id"].values,
                df_uniprot["query"].values,
                df_uniprot["gene_name"].values,
                df_uniprot["gene_id"].values,
                df_uniprot["organism"].values,
                df_uniprot["sequence_length"].values,
                df_uniprot["sequence"].values,
            ):
                fasta.append(
                    ">"
                    + str(query_ensembl_id)
                    + " uniprot_id: "
                    + str(uniprot_id)
                    + " ensembl_id: "
                    + str(query_ensembl_id)
                    + " gene_name: "
                    + str(gene_name)
                    + " organism: "
                    + str(organism)
                    + " sequence_length: "
                    + str(sequence_length)
                )
                fasta.append(str(uniprot_seq))

    # Save
    if save:
        file = open("gget_seq_results.fa", "w")
        for element in fasta:
            file.write(element + "\n")
        file.close()
        # missed samples
        return (set(trans_ids) - set(df_uniprot["query"].values)) | set(missing)

    return fasta

subset_fasta

subset_fasta: creates a new fasta file with only the sequence which names contain one of gene_names

Parameters:
  • gene_tosubset (set) –

    A set of gene names to subset from the original FASTA file.

  • fasta_path (str) –

    The path to the original FASTA file.

  • subfasta_path (str, default: './data/fasta/subset.fa' ) –

    The path to save the subsetted FASTA file. Defaults to "./data/fasta/subset.fa".

  • drop_unknown_seq (bool, default: True ) –

    If True, drop sequences containing unknown amino acids (denoted by '*'). Defaults to True.

Returns:
  • set( set ) –

    A set of gene names that were found and included in the subsetted FASTA file.

Raises:
  • ValueError

    If a gene name does not start with "ENS".

Source code in scprint/utils/get_seq.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def subset_fasta(
    gene_tosubset: set,
    fasta_path: str,
    subfasta_path: str = "./data/fasta/subset.fa",
    drop_unknown_seq: bool = True,
) -> set:
    """
    subset_fasta: creates a new fasta file with only the sequence which names contain one of gene_names

    Args:
        gene_tosubset (set): A set of gene names to subset from the original FASTA file.
        fasta_path (str): The path to the original FASTA file.
        subfasta_path (str, optional): The path to save the subsetted FASTA file. Defaults to "./data/fasta/subset.fa".
        drop_unknown_seq (bool, optional): If True, drop sequences containing unknown amino acids (denoted by '*'). Defaults to True.

    Returns:
        set: A set of gene names that were found and included in the subsetted FASTA file.

    Raises:
        ValueError: If a gene name does not start with "ENS".
    """
    dup = set()
    weird = 0
    genes_found = set()
    gene_tosubset = set(gene_tosubset)
    with open(fasta_path, "r") as original_fasta, open(
        subfasta_path, "w"
    ) as subset_fasta:
        for record in SeqIO.parse(original_fasta, "fasta"):
            gene_name = (
                record.description.split(" gene:")[1]
                .split(" transcript")[0]
                .split(".")[0]
            )
            if gene_name in gene_tosubset:
                if drop_unknown_seq:
                    if "*" in record.seq:
                        weird += 1

                        continue
                if not gene_name.startswith("ENS"):
                    raise ValueError("issue", gene_name)
                if gene_name in genes_found:
                    dup.add(gene_name)
                    continue
                record.description = ""
                record.id = gene_name
                SeqIO.write(record, subset_fasta, "fasta")
                genes_found.add(gene_name)
    print(len(dup), " genes had duplicates")
    print("dropped", weird, "weird sequences")
    return genes_found