my_seqtk

Author

ningyuxi

导入相关包

import random
from Bio import SeqIO
import math
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
import argparse

需要实现的功能{sample, sort, shuffle, subset}

# sample_fasta 随机抽取序列
def sample_fasta(fin,fout,num,seednum=11):
    ## 读取文件
    seq_index = SeqIO.index(fin,'fasta')
    ## 随机抽样
    random.seed(seednum)
    if num <1:
        num = math.ceil(len(seq_index) * num )
    keys = random.sample(list(seq_index.keys()),int(num))
    ## 提取子集
    seq_index_sample = {k:v for k,v in seq_index.items() if k in keys}
    ## 输出
    return SeqIO.write(seq_index_sample.values(),fout,'fasta')
    ## 
    ##return
#sample_fasta(fin,fout,2)


# sort_fasta:按序列长度排序
def sort_fasta(fin,fout,reverse):
    ## 读取文件
    seq_index = SeqIO.index(fin,'fasta')
    ## 排序
    sr_list = sorted(list(seq_index.values()), 
                    key=lambda x:len(x.seq), reverse=reverse)
    ## 写入文件
    return SeqIO.write(sr_list,fout,'fasta')
#sort_fasta(fin,fout,reverse=True)


# shuffle_fasta:打乱
def shuffle_fasta(fin,fout):
    ## 读取文件
    seq_index = SeqIO.index(fin,'fasta')
    ## 打乱
    shuffle_list = list(seq_index.values())
    random.shuffle(shuffle_list)
    ## 写入文件
    return SeqIO.write(shuffle_list,fout,'fasta')
#shuffle_fasta(fin,fout)


# sub_fasta_list:根据ID提取序列
def sub_fasta_list(fin,fout,id_list):
    ## 读取id_list文件
    with open(id_list,'r') as id_fi:
        keys = id_fi.read().strip().split('\n')
    ## 读取fa文件
    seq_index = SeqIO.index(fin,'fasta')
    sub_list = {k:v for k,v in seq_index.items() if k in keys}
    return SeqIO.write(sub_list.values(),fout,'fasta')
#sub_fasta_list(fin,fout,id_list)



# sub_fasta_by_bed: 根据ID和位置提取序列
def sub_fasta_by_bed(fin,fout,sub_bed):
    seq_index = SeqIO.index(fin,'fasta')
    sr_list = []
    with open(sub_bed,'r') as fi:
      for line in fi:
        content = line.strip().split('\t')
        acc = content[0]
        start = int(content[1])
        end = int(content[2])
        sub_sequence = seq_index[acc].seq[start-1:end]
        sr = SeqRecord(id=':'.join([acc, str(start), 
                        str(end)]), seq=sub_sequence, 
                        description=seq_index[acc].description)
        sr_list.append(sr)
    return SeqIO.write(sr_list,fout,'fasta')
#sub_fasta_by_bed(fin,fout,sub_bed)

主程序入口

parser = argparse.ArgumentParser()
parser.add_argument('subcommand',
                    type=str,
                    choices=['sample','sort','shuffle','subset'],
                    help='subcommand')
parser.add_argument('fin',
                    type=str,
                    help="input file in fasta file",
                    )
                   # required=True)
parser.add_argument('fout',
                    type=str,
                    help="output fasta file",
                    )
                    #required=True)
parser.add_argument('-num',
                    type=float,
                    help="the number of sample ")
parser.add_argument('-id',
                    type=str,
                    help='''you wanna get seq's 
                    id list(split \\t)''')
parser.add_argument('-shuffle',
                    type=str,
                    help="to shuffle fasta file"
                    )
parser.add_argument('-reverse',
                    action='store_true',
                    help="sort reverse")
parser.add_argument('-bed',
                    type=str,
                    help='''you wanna get seq's 
                    position bed file(split \\t) ''')

args = parser.parse_args()

if args.subcommand == 'sample':
    sample_fasta(args.fin,args.fout,args.num)
elif args.subcommand == 'sort':
    sort_fasta(args.fin,args.fout,args.reverse)
elif args.subcommand == 'shuffle':
    shuffle_fasta(args.fin,args.fout)
elif args.subcommand == 'subset':
    if args.id:
        sub_fasta_list(args.fin,args.fout,args.id)
    elif args.bed:
        sub_fasta_by_bed(args.fin,args.fout,args.bed)