#!/usr/bin/env python3 """ A simple parser for the GFF3 format. Test with transcripts.gff3 from http://www.broadinstitute.org/annotation/gebo/help/gff3.html. Format specification source: http://www.sequenceontology.org/gff3.shtml Version 1.1: Python3 ready """ import sys from collections import namedtuple import gzip import urllib.request, urllib.parse, urllib.error import FastaStream __author__ = "Uli Köhler" __license__ = "Apache License v2.0" __version__ = "1.1" #Initialized GeneInfo named tuple. Note: namedtuple is immutable gffInfoFields = ["seqid", "source", "type", "start", "end", "score", "strand", "phase", "attributes"] GFFRecord = namedtuple("GFFRecord", gffInfoFields) def parseGFFAttributes(attributeString): """Parse the GFF3 attribute column and return a dict"""# if attributeString == ".": return {} ret = {} for attribute in attributeString.split(";"): key, value = attribute.split("=") ret[urllib.parse.unquote(key)] = urllib.parse.unquote(value) return ret def parseGFF3(filename): """ A minimalistic GFF3 format parser. Yields objects that contain info about a single GFF3 feature. Supports transparent gzip decompression. """ #Parse with transparent decompression openFunc = gzip.open if filename.endswith(".gz") else open with openFunc(filename) as infile: skip = False for line in infile: if line.strip() == "##FASTA": skip = True yield infile # beyond here, fasta sequence data is found # so change to fastastream read if skip: continue if line.startswith("#"): continue parts = line.strip().split("\t") #If this fails, the file format is not standard-compatible # print(len(parts),len(gffInfoFields)) if len(parts) != len(gffInfoFields): print(parts) continue assert len(parts) == len(gffInfoFields) #Normalize data normalizedInfo = { "seqid": "" if parts[0] == "." else urllib.parse.unquote(parts[0]), "source": "" if parts[1] == "." else urllib.parse.unquote(parts[1]), "type": "" if parts[2] == "." else urllib.parse.unquote(parts[2]), "start": "" if parts[3] == "." else int(parts[3]), "end": "" if parts[4] == "." else int(parts[4]), "score": "" if parts[5] == "." else float(parts[5]), "strand": "" if parts[6] == "." else urllib.parse.unquote(parts[6]), "phase": "" if parts[7] == "." else urllib.parse.unquote(parts[7]), "attributes": parseGFFAttributes(parts[8]) } #Alternatively, you can emit the dictionary here, if you need mutability: yield normalizedInfo #yield GFFRecord(**normalizedInfo) def retrieve(key,d,nokey_value=''): if key in d.keys(): return d[key] else: return nokey_value def reportDict(recordl): #akeys = set() akeyl = ["Dbxref", "ID", "Name", "Ontology_term", "Target", "date", "md5", "signature_desc", "status"] print("\t".join(["seqid", "xstart", "xend", "source", "name", "signature_desc"])) for d in recordl: keys = set(d.keys()) att = d['attributes'] signature_desc = retrieve('signature_desc',att) #for key in att.keys(): # akeys.add(key) name = retrieve('Name',att) id = retrieve('ID',att) target = retrieve('Target',att) dbxref = retrieve('Dbxref',att,nokey_value="nodb") xend = d['end'] xstart = d['start'] seqid = d['seqid'] source = d['source'] xtype = d['type'] outl = [seqid,xstart,xend,source, name, signature_desc] print("\t".join(["%s" % x for x in outl])) def reportGffRecord(recordl): for x in recordl: keyset = set() for key in x.attributes.keys(): keyset.add(key) attributel = sorted(list(keyset)) # Dbxref ID Name Target date status" print("\t".join(["ID", "name", "target", "dbxref", "seqid", "type", "source", "start", "end", "score"])) for x in recordl: att = x.attributes ID = att['ID'] #print(dir(att)) if name in att.keys(): name = att['Name'] else: name = "none" if name in att.keys(): target = att['target'] else: target = "" if 'Dbxref' in att.keys(): dbxref = att['Dbxref'] else: dbxref = "none" outline = [ID,name,target,dbxref,x.seqid, x.type, x.source, x.start, x.end, x.score] print("\t".join(["%s" % x for x in outline])) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("file", help="The GFF3 input file (.gz allowed)") parser.add_argument("--print-records", action="store_true", help="Print all GeneInfo objects, not only") parser.add_argument("--filter-type", help="Ignore records not having the given type") parser.add_argument("--source", help="Ignore records not having the given source") args = parser.parse_args() #Execute the parser recordCount = 0 recordl = [] for record in parseGFF3(args.file): if recordCount == 0: rtype = type(record) #print(type(record)) if type(record) != rtype: # the rest of the data is available from FastaStream infile = record infile.seek(0) for line in infile: if line.strip() == "##FASTA": break fastadata = infile.read() fastaseqs = fastadata.split(">")[1:] seql = [] for rec in fastaseqs: lines = rec.split("\n") name,seq = lines[:2] seql.append((name,seq)) break recordl.append(record) #Apply filter, if any if args.filter_type and record.type and source.type != args.filter_type: continue #Print record if specified by the user if args.print_records: print(record) #Access attributes like this: my_strand = record.strand recordCount += 1 print("Total records: %d" % recordCount) print("sequences: %s" % len(seql)) reportDict(recordl) #reportGffRecord(recordl)
Run
Reset
Share
Import
Link
Embed
Language▼
English
中文
Python Fiddle
Python Cloud IDE
Follow @python_fiddle
Browser Version Not Supported
Due to Python Fiddle's reliance on advanced JavaScript techniques, older browsers might have problems running it correctly. Please download the latest version of your favourite browser.
Chrome 10+
Firefox 4+
Safari 5+
IE 10+
Let me try anyway!
url:
Go
Python Snippet
Stackoverflow Question