forked from c-amr/camr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multiprocess.py
195 lines (154 loc) · 5.71 KB
/
multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/bin/bash/env python
# coding=utf-8
import sys, os, traceback
from multiprocessing import Queue, Process, Lock, JoinableQueue, cpu_count
from multiprocessing.sharedctypes import Value
from Queue import Empty
from progress import Progress
from threading import Thread
# parse process body
def worker(processor, input_queue, results_queue, count, sync_after=0):
local_count = 0
try:
while True:
item = input_queue.get(True, 2)
if item:
index, data = item
result = processor(data)
results_queue.put((index, result))
local_count += 1
input_queue.task_done()
if local_count >= sync_after:
count.value += local_count
local_count = 0
except KeyboardInterrupt:
# print >> sys.stderr, 'Job interrupted'
os.abort() # abort instead of exit so that multiprocessing won't wait
return
except Empty:
pass
except:
traceback.print_exc()
if local_count > 0:
count.value += local_count
def process(input_data, processor, max_refresh_delay=0.3):
# will use multiprocessing to parallelize parsing
queue = JoinableQueue()
results_queue = Queue()
# populate queue, reserve place for results
results = []
for i, data in enumerate(input_data):
queue.put((i, data))
results.append(None)
p = Progress(len(input_data), estimate=True, values=True) # output progress bar
# define jobs
count = Value('i', 0)
num_threads = cpu_count()
sync_count = len(input_data)/1000/num_threads
print 'Starting %i jobs ...' % num_threads
jobs = [Process(target=worker, args=(processor, queue, results_queue, count, sync_count)) for i in range(num_threads)]
try:
# start jobs
for job in jobs:
job.start()
# gathering results from jobs
total_count = 0
while total_count < len(input_data):
try:
item = results_queue.get(True, max_refresh_delay) # timeout delay small enough to update progress bar, see below
results[item[0]] = item[1]
total_count += 1
except Empty:
pass
p.set(count.value) # even if no results are received (cached somewhere), the counter will be updated after get() timeout above
# NOTE: There might be a slight delay after reaching 100%, because the finished results counter is ahead of received results counter;
# will stay at 100% until all results are received.
p.set(total_count)
p.complete()
# wait for jobs to finish
queue.join()
for job in jobs:
job.join()
except KeyboardInterrupt:
print >> sys.stderr, '\nInterrupted, aborting'
os.abort() # abort instead of exit so that multiprocessing won't wait
except:
traceback.print_exc()
return results
class Processor(Thread):
def __init__(self, processor_factory, processes=4):
Thread.__init__(self)
self.processor_factory = processor_factory
self.input_queue = JoinableQueue()
self.results_queue = Queue()
self.processes = processes
self._stop = False
self.__stop = None
self.start()
def stop(self):
self._stop = True
if self.__stop:
self.__stop.value = 1
@staticmethod
def worker(make_processor, input_queue, results_queue, stop):
processor = None
try:
processor = make_processor()
while stop.value == 0:
try:
while stop.value == 0:
item = input_queue.get(True, 0.5)
if item:
index, data = item
result = processor(data)
results_queue.put((index, result))
input_queue.task_done()
except Empty:
pass
except KeyboardInterrupt:
# print >> sys.stderr, 'Job interrupted'
# os.abort() # abort instead of exit so that multiprocessing won't wait
pass
except:
traceback.print_exc()
finally:
if processor and hasattr(processor, 'stop'):
processor.stop()
# print >> sys.stderr, 'Job interrupted'
def run(self):
try:
stop = Value('B', 0)
self.__stop = stop
jobs = [Process(target=self.worker, args=(self.processor_factory, self.input_queue, self.results_queue, stop)) for i in range(self.processes)]
import time
# start jobs
for job in jobs:
job.start()
while not self._stop:
time.sleep(0.5)
stop.value = 1
# wait for jobs to finish
self.input_queue.join()
for job in jobs:
job.join()
except KeyboardInterrupt:
# print >> sys.stderr, '\nInterrupted, aborting'
# os.abort() # abort instead of exit so that multiprocessing won't wait
pass
except:
traceback.print_exc()
def __call__(self, input_data):
results = []
for i, data in enumerate(input_data):
self.input_queue.put((i, data))
results.append(None)
# gathering results from jobs
total_count = 0
while total_count < len(input_data):
try:
item = self.results_queue.get(True, 0.5) # waiting happens here
results[item[0]] = item[1]
total_count += 1
except Empty:
pass
return results