-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathintarna_pvalue.py
More file actions
309 lines (266 loc) · 14 KB
/
intarna_pvalue.py
File metadata and controls
309 lines (266 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
#!/usr/bin/env python3
# Copyright 2019
# Author: Fabio Gutmann <https://github.com/fabio-gut>
from subprocess import PIPE, Popen, run
import random
import os
import argparse
import sys
from typing import List, Tuple
import numpy as np
from scipy.integrate import quad as integ
from scipy.stats import norm as gauss
from scipy.stats import genextreme as gev
from scipy.stats import gumbel_l as gum
from intarnapvalue import dinucl_shuffle
class IntaRNApvalue:
def __init__(self, test_args=None):
self.bin = self.find_binary()
self.query = ''
self.target = ''
self.samples = 0
self.shuffle_query = False
self.shuffle_target = False
self.threads = ''
self.dist = ''
self.output = ''
self.parameterFile = ''
self.process_cmd_args(test_args)
if not test_args:
self.main()
def main(self) -> None:
"""The main function"""
if self.output == 'scores': # output scores and exit the process
scores, non_interactions = self.get_scores()
print('\n'.join(iter([str(x) for x in scores])))
sys.exit(0)
original_score = self.get_single_score(self.query, self.target)
if not original_score: # exit if given seq has no interaction and not scores output only mode
print('The query/target combination you specified has no favorable interaction')
sys.exit(1)
scores, non_interactions = self.get_scores()
pvalue = 0.0
if self.dist == 'gauss':
pvalue = self.calculate_pvalue_gauss(original_score, scores)
elif self.dist == 'none':
pvalue = self.calculate_pvalue_empirical(original_score, scores)
elif self.dist == 'gumbel':
pvalue = self.calculate_pvalue_gumbel(original_score, scores)
elif self.dist == 'gev':
pvalue = self.calculate_pvalue_gev(original_score, scores)
print(pvalue)
@staticmethod
def find_binary() -> str:
"""Tries to find the IntaRNA executable and returns path to binary or exits with error code 1 if not found"""
if not run('IntaRNA --version', shell=True, stdout=PIPE, stderr=PIPE).returncode:
#print(run('IntaRNA --version', shell=True, stdout=PIPE).stdout)
return 'IntaRNA'
# if binary not found in path, search in parent of this script recursively
bin_name = 'IntaRNA.exe' if os.name == 'nt' else 'IntaRNA'
for dir_path, dir_name, file_names in os.walk(os.path.abspath(os.path.join(os.curdir, '..'))):
if bin_name in file_names:
#print(run(os.path.join(dir_path, bin_name)+' --version', shell=True, stdout=PIPE).stdout)
return os.path.join(dir_path, bin_name)
print('Error: Cannot find IntaRNA binary executable, please add it to your PATH')
sys.exit(1)
def process_cmd_args(self, test_args=None) -> None:
"""Processes all commandline args and sets them as instance variables.
>>> i = IntaRNApvalue(['-q', 'AGGAUG', '-t', 'UUUAUCGUU', '-s', '10', '-m', 'b', '-d', 'gauss',
... '--threads', '3'])
>>> i.query
'AGGAUG'
>>> i.target
'UUUAUCGUU'
>>> i.samples
10
>>> i.shuffle_query
True
>>> i.shuffle_target
True
>>> i.threads
'3'
>>> i.dist
'gauss'
>>> i.output
'pvalue'
>>> i = IntaRNApvalue(['-q', 'Z', '-t', 'UUUAUCGUU', '-s', '10', '-m', 'b', '--threads', '3'])
Traceback (most recent call last):
...
SystemExit: 1
>>> open('test.fasta', 'w').write('>someseq\\nGACU')
13
>>> i = IntaRNApvalue(['-q', 'test.fasta', '-t', 'UUUAUCGUU', '-s', '10', '-m', 'b', '--threads', '3'])
>>> i.query
'GACU'
>>> os.remove('test.fasta')
"""
parser = argparse.ArgumentParser(description='Calculates p-values of IntaRNA energy scores.')
parser.add_argument('-q', '--query', dest='query', type=str, help='Query sequence', required=True)
parser.add_argument('-t', '--target', dest='target', type=str, help='Target sequence', required=True)
parser.add_argument('-s', '--samples', dest='samples', type=int, required=True,
help='How many sequence pairs are randomly permuted and considered for p-value calculation.')
parser.add_argument('-m', '--shuffle-mode', dest='shuffleMode', required=True, choices=['q', 't', 'b'],
help='Which sequences are going to be shuffled: both, query only or target only.')
parser.add_argument('-p', '--parameterFile', dest='parameterFile', default="",
help='Optional parameter file for IntaRNA provide further IntaRNA parameters and prediction constraints.')
parser.add_argument('-d', '--distribution', dest='dist', choices=['gev', 'gumbel', 'gauss'], default='gev',
help='[gev] Which distribution is fitted and used to calculate the p-value.')
parser.add_argument('-o', '--output', dest='output', choices=['pvalue', 'scores'], default='pvalue',
help='[pvalue] If set to "scores", outputs all IntaRNA scores from random sequences to STDOUT. '
'This is useful for piping the scores. Otherwise outputs just the p-value.')
parser.add_argument('--threads', type=str, default='0', help='[0] Sets the amount of threads used for IntaRNA.')
parser.add_argument('--randSeed', type=str, default=None,
help='Random seed to make sequence shuffling and thus output deterministic.')
args = parser.parse_args(test_args)
allowed = ['G', 'A', 'C', 'U', 'T']
if os.path.exists(args.query):
args.query = self.read_fasta_file(args.query)
if os.path.exists(args.target):
args.target = self.read_fasta_file(args.target)
if args.parameterFile and not os.path.exists(args.parameterFile):
print("Cannot find provided parameter file '"+args.parameterFile+"'")
sys.exit(1)
if False in [n in allowed for n in args.query.upper()] or False in [n in allowed for n in args.target.upper()]:
print('A sequence you specified contains illegal characters, allowed: G, A, C, U (T)')
sys.exit(1)
self.shuffle_query = True if args.shuffleMode in ['b', 'q'] else False
self.shuffle_target = True if args.shuffleMode in ['b', 't'] else False
for key, value in args.__dict__.items():
self.__setattr__(key, value)
random.seed(a=args.randSeed)
@staticmethod
def read_fasta_file(filename: str) -> str:
"""Reads a FASTA file and returns the sequence. Exits the program if something goes wrong.
>>> with open('test.fasta', 'w') as f:
... f.write('>somename\\nGACUGGAGUGC')
21
>>> IntaRNApvalue.read_fasta_file('test.fasta')
'GACUGGAGUGC'
>>> IntaRNApvalue.read_fasta_file('test2.fasta')
Traceback (most recent call last):
...
SystemExit: 1
>>> os.remove('test.fasta')
"""
try:
with open(filename) as f:
data = f.read().split('\n')
if not data[0].startswith('>'):
print('Error: {} is not in FASTA format!'.format(filename))
sys.exit(1)
f.close()
return data[1].upper().replace('T', 'U')
except FileNotFoundError as e: # In theory this can never happen, lets prevent it anyways
print(e)
sys.exit(1)
except IndexError:
print('Error: {} is not in FASTA format!'.format(filename))
sys.exit(1)
@staticmethod
def shuffle_sequence(seq: str, n: int) -> List[str]:
"""Shuffles a sequence n times and returns a list of sequences, duplicate entries are possible
>>> random.seed('IntaRNA')
>>> IntaRNApvalue.shuffle_sequence('AGGAUGGGGGA', 5)
['AUGGAGGGGGA', 'AUGGGGAGGGA', 'AGGGGAUGGGA', 'AUGGGGGAGGA', 'AUGGGAGGGGA']
"""
return [dinucl_shuffle.dinucl_shuffle(seq.upper()) for _ in range(n)]
@staticmethod
def to_fasta(sequences: List[str]) -> str:
"""Combines a list of sequences into a string in FASTA format
>>> IntaRNApvalue.to_fasta(['AUGGAGGGGGA', 'AUGGGGAGGGA', 'AGGGGAUGGGA', 'AUGGGGGAGGA', 'AUGGGAGGGGA'])
'>0\\nAUGGAGGGGGA\\n>1\\nAUGGGGAGGGA\\n>2\\nAGGGGAUGGGA\\n>3\\nAUGGGGGAGGA\\n>4\\nAUGGGAGGGGA\\n'
"""
fasta_str = ''
n = 0
for seq in sequences:
fasta_str += '>{}\n{}\n'.format(n, seq)
n += 1
return fasta_str
def get_scores(self) -> Tuple[List[float], int]:
"""Calculates n IntaRNA energy scores from random sequences with given parameters as class variables"""
scores = []
missing = self.samples
non_interactions = 0
paramFile = ""
if self.parameterFile :
paramFile = "--parameterFile="+self.parameterFile
while missing > 0:
if self.shuffle_query and self.shuffle_target: # shuffle both
query = self.shuffle_sequence(self.query, 1)[0] # get a random query
target = 'STDIN'
shuffles = self.to_fasta(self.shuffle_sequence(self.query, missing))
elif self.shuffle_query and not self.shuffle_target: # only shuffle query
query = 'STDIN'
target = self.target # target stays the same
shuffles = self.to_fasta(self.shuffle_sequence(self.query, missing))
else: # only shuffle target
query = self.query # query stays the same
target = 'STDIN'
shuffles = self.to_fasta(self.shuffle_sequence(self.target, missing))
p = Popen([self.bin, '-q', query, '-t', target, '--outMode=C', '--outCsvCols=E', '--threads', self.threads, paramFile],
stdout=PIPE, stdin=PIPE, universal_newlines=True)
stdout, stderr = p.communicate(input=shuffles) # send shuffles as STDIN
if p.returncode: # If IntaRNA exits with a returncode != 0, skip this iteration
print("IntaRNA failed (ret={})".format(p.returncode))
continue
stdout = stdout.split('\n') # split on newline
del stdout[0], stdout[-1] # remove first element aka 'E' and trailing newline element
scores.extend(stdout) # add elements to scores
missing = self.samples - len(scores)
non_interactions += missing # count non-interactions
# return list with all elements as float and amount of non-interactions
return [float(x) for x in scores], non_interactions
def get_single_score(self, query, target) -> float:
"""Gets an IntaRNA score to a single query/target combination"""
o = run('{} -q {} -t {} --outMode=C --outCsvCols=E --threads {}'.format(self.bin, query, target, self.threads),
stdout=PIPE, stdin=PIPE, shell=True, universal_newlines=True).stdout
if o.startswith('E') and o != 'E\n': # Check that we got a result
return float(o.split('\n')[1])
else:
return 0 # no interaction
@staticmethod
def calculate_pvalue_empirical(original_score, scores: list = None) -> float:
"""Calculates a p-value to a target/query combination empirical with a given amount of shuffle iterations
>>> i = IntaRNApvalue(['-q', 'AGGAUG', '-t', 'UUUAUCGUU', '--scores', '10', '-m', 'b', '--threads', '3'])
>>> i.calculate_pvalue_empirical(-10.0, [-1.235, -1.435645, -6.234234, -12.999, -15.23, -6.98, -6.23, -2.78])
0.25
"""
return [score <= original_score for score in scores].count(True) / len(scores)
@staticmethod
def calculate_pvalue_gauss(original_score, scores: list) -> float:
"""Calculates a p-value to a target/query combination by int. with a given amount of shuffle iterations by
fitting a gaussian distribution and integrating from -inf to the original score
>>> i = IntaRNApvalue(['-q', 'AGGAUG', '-t', 'UUUAUCGUU', '--scores', '10', '-m', 'b', '--threads', '3'])
>>> i.calculate_pvalue_gauss(-10.0, [-1.235, -1.435645, -6.234234, -12.999, -15.23, -6.98, -6.23, -2.78])
0.2429106747265256
"""
loc, scale = gauss.fit(scores)
def f(x):
return gauss.pdf(x, loc=loc, scale=scale)
return integ(f, -np.inf, original_score)[0]
@staticmethod
def calculate_pvalue_gumbel(original_score: float, scores: list) -> float:
"""Calculates a p-value to a target/query combination by int. with a given amount of shuffle iterations by
fitting a gumbel distribution and integrating from -inf to the original score
>>> i = IntaRNApvalue(['-q', 'AGGAUG', '-t', 'UUUAUCGUU', '--scores', '10', '-m', 'b', '--threads', '3'])
>>> i.calculate_pvalue_gumbel(-10.0, [-1.235, -1.435645, -6.234234, -12.999, -15.23, -6.98, -6.23, -2.78])
0.19721934073203196
"""
loc, scale = gum.fit(scores)
def f(x):
return gum.pdf(x, loc=loc, scale=scale)
return integ(f, -np.inf, original_score)[0]
@staticmethod
def calculate_pvalue_gev(original_score: float, scores: list) -> float:
"""Calculates a p-value to a target/query combination by int. with a given amount of shuffle iterations by
fitting a generalized extreme value distribution and integrating from -inf to the original score
>>> i = IntaRNApvalue(['-q', 'AGGAUG', '-t', 'UUUAUCGUU', '--scores', '10', '-m', 'b', '--threads', '3'])
>>> i.calculate_pvalue_gev(-10.0, [-1.235, -1.435645, -6.234234, -12.999, -15.23, -6.98, -6.23, -2.78])
0.17611816922560236
"""
shape, loc, scale = gev.fit(scores)
def f(x):
return gev.pdf(x, shape, loc=loc, scale=scale)
return integ(f, -np.inf, original_score)[0]
if __name__ == '__main__':
i = IntaRNApvalue()