forked from smicallef/spiderfoot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
control.py
3631 lines (2744 loc) · 126 KB
/
control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2011-2015, Damian Johnson and The Tor Project
# See LICENSE for licensing information
"""
Module for interacting with the Tor control socket. The
:class:`~stem.control.Controller` is a wrapper around a
:class:`~stem.socket.ControlSocket`, retaining many of its methods (connect,
close, is_alive, etc) in addition to providing its own for working with the
socket at a higher level.
Stem has `several ways <../faq.html#how-do-i-connect-to-tor>`_ of getting a
:class:`~stem.control.Controller`, but the most flexible are
:func:`~stem.control.Controller.from_port` and
:func:`~stem.control.Controller.from_socket_file`. These static
:class:`~stem.control.Controller` methods give you an **unauthenticated**
Controller you can then authenticate yourself using its
:func:`~stem.control.Controller.authenticate` method. For example...
::
import getpass
import sys
import stem
import stem.connection
from stem.control import Controller
if __name__ == '__main__':
try:
controller = Controller.from_port()
except stem.SocketError as exc:
print("Unable to connect to tor on port 9051: %s" % exc)
sys.exit(1)
try:
controller.authenticate()
except stem.connection.MissingPassword:
pw = getpass.getpass("Controller password: ")
try:
controller.authenticate(password = pw)
except stem.connection.PasswordAuthFailed:
print("Unable to authenticate, password is incorrect")
sys.exit(1)
except stem.connection.AuthenticationFailure as exc:
print("Unable to authenticate: %s" % exc)
sys.exit(1)
print("Tor is running version %s" % controller.get_version())
controller.close()
If you're fine with allowing your script to raise exceptions then this can be more nicely done as...
::
from stem.control import Controller
if __name__ == '__main__':
with Controller.from_port() as controller:
controller.authenticate()
print("Tor is running version %s" % controller.get_version())
**Module Overview:**
::
Controller - General controller class intended for direct use
| |- from_port - Provides a Controller based on a port connection.
| +- from_socket_file - Provides a Controller based on a socket file connection.
|
|- authenticate - authenticates this controller with tor
|
|- get_info - issues a GETINFO query for a parameter
|- get_version - provides our tor version
|- get_exit_policy - provides our exit policy
|- get_ports - provides the local ports where tor is listening for connections
|- get_listeners - provides the addresses and ports where tor is listening for connections
|- get_accounting_stats - provides stats related to relaying limits
|- get_protocolinfo - information about the controller interface
|- get_user - provides the user tor is running as
|- get_pid - provides the pid of our tor process
|
|- get_microdescriptor - querying the microdescriptor for a relay
|- get_microdescriptors - provides all currently available microdescriptors
|- get_server_descriptor - querying the server descriptor for a relay
|- get_server_descriptors - provides all currently available server descriptors
|- get_network_status - querying the router status entry for a relay
|- get_network_statuses - provides all preently available router status entries
|- get_hidden_service_descriptor - queries the given hidden service descriptor
|
|- get_conf - gets the value of a configuration option
|- get_conf_map - gets the values of multiple configuration options
|- set_conf - sets the value of a configuration option
|- reset_conf - reverts configuration options to their default values
|- set_options - sets or resets the values of multiple configuration options
|
|- get_hidden_service_conf - provides our hidden service configuration
|- set_hidden_service_conf - sets our hidden service configuration
|- create_hidden_service - creates a new hidden service or adds a new port
|- remove_hidden_service - removes a hidden service or drops a port
|
|- list_ephemeral_hidden_services - list ephemeral hidden serivces
|- create_ephemeral_hidden_service - create a new ephemeral hidden service
|- remove_ephemeral_hidden_service - removes an ephemeral hidden service
|
|- add_event_listener - attaches an event listener to be notified of tor events
|- remove_event_listener - removes a listener so it isn't notified of further events
|
|- is_caching_enabled - true if the controller has enabled caching
|- set_caching - enables or disables caching
|- clear_cache - clears any cached results
|
|- load_conf - loads configuration information as if it was in the torrc
|- save_conf - saves configuration information to the torrc
|
|- is_feature_enabled - checks if a given controller feature is enabled
|- enable_feature - enables a controller feature that has been disabled by default
|
|- get_circuit - provides an active circuit
|- get_circuits - provides a list of active circuits
|- new_circuit - create new circuits
|- extend_circuit - create new circuits and extend existing ones
|- repurpose_circuit - change a circuit's purpose
|- close_circuit - close a circuit
|
|- get_streams - provides a list of active streams
|- attach_stream - attach a stream to a circuit
|- close_stream - close a stream
|
|- signal - sends a signal to the tor client
|- is_newnym_available - true if tor would currently accept a NEWNYM signal
|- get_newnym_wait - seconds until tor would accept a NEWNYM signal
|- get_effective_rate - provides our effective relaying rate limit
|- is_geoip_unavailable - true if we've discovered our geoip db to be unavailable
|- map_address - maps one address to another such that connections to the original are replaced with the other
+- drop_guards - drops our set of guard relays and picks a new set
BaseController - Base controller class asynchronous message handling
|- msg - communicates with the tor process
|- is_alive - reports if our connection to tor is open or closed
|- is_localhost - returns if the connection is for the local system or not
|- connection_time - time when we last connected or disconnected
|- is_authenticated - checks if we're authenticated to tor
|- connect - connects or reconnects to tor
|- close - shuts down our connection to the tor process
|- get_socket - provides the socket used for control communication
|- get_latest_heartbeat - timestamp for when we last heard from tor
|- add_status_listener - notifies a callback of changes in our status
|- remove_status_listener - prevents further notification of status changes
+- __enter__ / __exit__ - manages socket connection
.. data:: State (enum)
Enumeration for states that a controller can have.
========== ===========
State Description
========== ===========
**INIT** new control connection
**RESET** received a reset/sighup signal
**CLOSED** control connection closed
========== ===========
.. data:: EventType (enum)
Known types of events that the
:func:`~stem.control.Controller.add_event_listener` method of the
:class:`~stem.control.Controller` can listen for.
The most frequently listened for event types tend to be the logging events
(**DEBUG**, **INFO**, **NOTICE**, **WARN**, and **ERR**), bandwidth usage
(**BW**), and circuit or stream changes (**CIRC** and **STREAM**).
Enums are mapped to :class:`~stem.response.events.Event` subclasses as
follows...
======================= ===========
EventType Event Class
======================= ===========
**ADDRMAP** :class:`stem.response.events.AddrMapEvent`
**AUTHDIR_NEWDESCS** :class:`stem.response.events.AuthDirNewDescEvent`
**BUILDTIMEOUT_SET** :class:`stem.response.events.BuildTimeoutSetEvent`
**BW** :class:`stem.response.events.BandwidthEvent`
**CELL_STATS** :class:`stem.response.events.CellStatsEvent`
**CIRC** :class:`stem.response.events.CircuitEvent`
**CIRC_BW** :class:`stem.response.events.CircuitBandwidthEvent`
**CIRC_MINOR** :class:`stem.response.events.CircMinorEvent`
**CLIENTS_SEEN** :class:`stem.response.events.ClientsSeenEvent`
**CONF_CHANGED** :class:`stem.response.events.ConfChangedEvent`
**CONN_BW** :class:`stem.response.events.ConnectionBandwidthEvent`
**DEBUG** :class:`stem.response.events.LogEvent`
**DESCCHANGED** :class:`stem.response.events.DescChangedEvent`
**ERR** :class:`stem.response.events.LogEvent`
**GUARD** :class:`stem.response.events.GuardEvent`
**HS_DESC** :class:`stem.response.events.HSDescEvent`
**HS_DESC_CONTENT** :class:`stem.response.events.HSDescContentEvent`
**INFO** :class:`stem.response.events.LogEvent`
**NEWCONSENSUS** :class:`stem.response.events.NewConsensusEvent`
**NEWDESC** :class:`stem.response.events.NewDescEvent`
**NOTICE** :class:`stem.response.events.LogEvent`
**NS** :class:`stem.response.events.NetworkStatusEvent`
**ORCONN** :class:`stem.response.events.ORConnEvent`
**SIGNAL** :class:`stem.response.events.SignalEvent`
**STATUS_CLIENT** :class:`stem.response.events.StatusEvent`
**STATUS_GENERAL** :class:`stem.response.events.StatusEvent`
**STATUS_SERVER** :class:`stem.response.events.StatusEvent`
**STREAM** :class:`stem.response.events.StreamEvent`
**STREAM_BW** :class:`stem.response.events.StreamBwEvent`
**TB_EMPTY** :class:`stem.response.events.TokenBucketEmptyEvent`
**TRANSPORT_LAUNCHED** :class:`stem.response.events.TransportLaunchedEvent`
**WARN** :class:`stem.response.events.LogEvent`
======================= ===========
.. data:: Listener (enum)
Purposes for inbound connections that Tor handles.
============= ===========
Listener Description
============= ===========
**OR** traffic we're relaying as a member of the network (torrc's **ORPort** and **ORListenAddress**)
**DIR** mirroring for tor descriptor content (torrc's **DirPort** and **DirListenAddress**)
**SOCKS** client traffic we're sending over Tor (torrc's **SocksPort** and **SocksListenAddress**)
**TRANS** transparent proxy handling (torrc's **TransPort** and **TransListenAddress**)
**NATD** forwarding for ipfw NATD connections (torrc's **NatdPort** and **NatdListenAddress**)
**DNS** DNS lookups for our traffic (torrc's **DNSPort** and **DNSListenAddress**)
**CONTROL** controller applications (torrc's **ControlPort** and **ControlListenAddress**)
============= ===========
"""
import calendar
import collections
import functools
import inspect
import io
import os
import threading
import time
try:
# Added in 2.7
from collections import OrderedDict
except ImportError:
from stem.util.ordereddict import OrderedDict
try:
import queue
from io import StringIO
except ImportError:
import Queue as queue
from StringIO import StringIO
import stem.descriptor.microdescriptor
import stem.descriptor.reader
import stem.descriptor.router_status_entry
import stem.descriptor.server_descriptor
import stem.exit_policy
import stem.response
import stem.response.events
import stem.socket
import stem.util.connection
import stem.util.enum
import stem.util.str_tools
import stem.util.system
import stem.util.tor_tools
import stem.version
from stem import UNDEFINED, CircStatus, Signal, str_type
from stem.util import log
# state changes a control socket can have
State = stem.util.enum.Enum('INIT', 'RESET', 'CLOSED')
EventType = stem.util.enum.UppercaseEnum(
'ADDRMAP',
'AUTHDIR_NEWDESCS',
'BUILDTIMEOUT_SET',
'BW',
'CELL_STATS',
'CIRC',
'CIRC_BW',
'CIRC_MINOR',
'CONF_CHANGED',
'CONN_BW',
'CLIENTS_SEEN',
'DEBUG',
'DESCCHANGED',
'ERR',
'GUARD',
'HS_DESC',
'HS_DESC_CONTENT',
'INFO',
'NEWCONSENSUS',
'NEWDESC',
'NOTICE',
'NS',
'ORCONN',
'SIGNAL',
'STATUS_CLIENT',
'STATUS_GENERAL',
'STATUS_SERVER',
'STREAM',
'STREAM_BW',
'TB_EMPTY',
'TRANSPORT_LAUNCHED',
'WARN',
)
Listener = stem.util.enum.UppercaseEnum(
'OR',
'DIR',
'SOCKS',
'TRANS',
'NATD',
'DNS',
'CONTROL',
)
# Configuration options that are fetched by a special key. The keys are
# lowercase to make case insensitive lookups easier.
MAPPED_CONFIG_KEYS = {
'hiddenservicedir': 'HiddenServiceOptions',
'hiddenserviceport': 'HiddenServiceOptions',
'hiddenserviceversion': 'HiddenServiceOptions',
'hiddenserviceauthorizeclient': 'HiddenServiceOptions',
'hiddenserviceoptions': 'HiddenServiceOptions',
}
# unchangeable GETINFO parameters
CACHEABLE_GETINFO_PARAMS = (
'version',
'config-file',
'exit-policy/default',
'fingerprint',
'config/names',
'config/defaults',
'info/names',
'events/names',
'features/names',
'process/descriptor-limit',
)
# GETCONF parameters we shouldn't cache. This includes hidden service
# perameters due to the funky way they're set and retrieved (for instance,
# 'SETCONF HiddenServiceDir' effects 'GETCONF HiddenServiceOptions').
UNCACHEABLE_GETCONF_PARAMS = (
'hiddenserviceoptions',
'hiddenservicedir',
'hiddenserviceport',
'hiddenserviceversion',
'hiddenserviceauthorizeclient',
)
# number of sequential attempts before we decide that the Tor geoip database
# is unavailable
GEOIP_FAILURE_THRESHOLD = 5
SERVER_DESCRIPTORS_UNSUPPORTED = "Tor is currently not configured to retrieve \
server descriptors. As of Tor version 0.2.3.25 it downloads microdescriptors \
instead unless you set 'UseMicrodescriptors 0' in your torrc."
AccountingStats = collections.namedtuple('AccountingStats', [
'retrieved',
'status',
'interval_end',
'time_until_reset',
'read_bytes',
'read_bytes_left',
'read_limit',
'written_bytes',
'write_bytes_left',
'write_limit',
])
CreateHiddenServiceOutput = collections.namedtuple('CreateHiddenServiceOutput', [
'path',
'hostname',
'hostname_for_client',
'config',
])
def with_default(yields = False):
"""
Provides a decorator to support having a default value. This should be
treated as private.
"""
def decorator(func):
def get_default(func, args, kwargs):
arg_names = inspect.getargspec(func).args[1:] # drop 'self'
default_position = arg_names.index('default') if 'default' in arg_names else None
if default_position is not None and default_position < len(args):
return args[default_position]
else:
return kwargs.get('default', UNDEFINED)
if not yields:
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as exc:
default = get_default(func, args, kwargs)
if default == UNDEFINED:
raise exc
else:
return default
else:
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
for val in func(self, *args, **kwargs):
yield val
except Exception as exc:
default = get_default(func, args, kwargs)
if default == UNDEFINED:
raise exc
else:
if default is not None:
for val in default:
yield val
return wrapped
return decorator
class BaseController(object):
"""
Controller for the tor process. This is a minimal base class for other
controllers, providing basic process communication and event listing. Don't
use this directly - subclasses like the :class:`~stem.control.Controller`
provide higher level functionality.
It's highly suggested that you don't interact directly with the
:class:`~stem.socket.ControlSocket` that we're constructed from - use our
wrapper methods instead.
If the **control_socket** is already authenticated to Tor then the caller
should provide the **is_authenticated** flag. Otherwise, we will treat the
socket as though it hasn't yet been authenticated.
"""
def __init__(self, control_socket, is_authenticated = False):
self._socket = control_socket
self._msg_lock = threading.RLock()
self._status_listeners = [] # tuples of the form (callback, spawn_thread)
self._status_listeners_lock = threading.RLock()
# queues where incoming messages are directed
self._reply_queue = queue.Queue()
self._event_queue = queue.Queue()
# thread to continually pull from the control socket
self._reader_thread = None
# thread to pull from the _event_queue and call handle_event
self._event_notice = threading.Event()
self._event_thread = None
# saves our socket's prior _connect() and _close() methods so they can be
# called along with ours
self._socket_connect = self._socket._connect
self._socket_close = self._socket._close
self._socket._connect = self._connect
self._socket._close = self._close
self._last_heartbeat = 0.0 # timestamp for when we last heard from tor
self._is_authenticated = False
self._state_change_threads = [] # threads we've spawned to notify of state changes
if self._socket.is_alive():
self._launch_threads()
if is_authenticated:
self._post_authentication()
def msg(self, message):
"""
Sends a message to our control socket and provides back its reply.
:param str message: message to be formatted and sent to tor
:returns: :class:`~stem.response.ControlMessage` with the response
:raises:
* :class:`stem.ProtocolError` the content from the socket is
malformed
* :class:`stem.SocketError` if a problem arises in using the
socket
* :class:`stem.SocketClosed` if the socket is shut down
"""
with self._msg_lock:
# If our _reply_queue isn't empty then one of a few things happened...
#
# - Our connection was closed and probably re-restablished. This was
# in reply to pulling for an asynchronous event and getting this is
# expected - ignore it.
#
# - Pulling for asynchronous events produced an error. If this was a
# ProtocolError then it's a tor bug, and if a non-closure SocketError
# then it was probably a socket glitch. Deserves an INFO level log
# message.
#
# - This is a leftover response for a msg() call. We can't tell who an
# exception was earmarked for, so we only know that this was the case
# if it's a ControlMessage.
#
# This is the most concerning situation since it indicates that one of
# our callers didn't get their reply. However, this is still a
# perfectly viable use case. For instance...
#
# 1. We send a request.
# 2. The reader thread encounters an exception, for instance a socket
# error. We enqueue the exception.
# 3. The reader thread receives the reply.
# 4. We raise the socket error, and have an undelivered message.
#
# Thankfully this only seems to arise in edge cases around rapidly
# closing/reconnecting the socket.
while not self._reply_queue.empty():
try:
response = self._reply_queue.get_nowait()
if isinstance(response, stem.SocketClosed):
pass # this is fine
elif isinstance(response, stem.ProtocolError):
log.info('Tor provided a malformed message (%s)' % response)
elif isinstance(response, stem.ControllerError):
log.info('Socket experienced a problem (%s)' % response)
elif isinstance(response, stem.response.ControlMessage):
log.info('Failed to deliver a response: %s' % response)
except queue.Empty:
# the empty() method is documented to not be fully reliable so this
# isn't entirely surprising
break
try:
self._socket.send(message)
response = self._reply_queue.get()
# If the message we received back had an exception then re-raise it to the
# caller. Otherwise return the response.
if isinstance(response, stem.ControllerError):
raise response
else:
# I really, really don't like putting hooks into this method, but
# this is the most reliable method I can think of for taking actions
# immediately after successfully authenticating to a connection.
if message.upper().startswith('AUTHENTICATE'):
self._post_authentication()
return response
except stem.SocketClosed as exc:
# If the recv() thread caused the SocketClosed then we could still be
# in the process of closing. Calling close() here so that we can
# provide an assurance to the caller that when we raise a SocketClosed
# exception we are shut down afterward for realz.
self.close()
raise exc
def is_alive(self):
"""
Checks if our socket is currently connected. This is a pass-through for our
socket's :func:`~stem.socket.ControlSocket.is_alive` method.
:returns: **bool** that's **True** if our socket is connected and **False** otherwise
"""
return self._socket.is_alive()
def is_localhost(self):
"""
Returns if the connection is for the local system or not.
.. versionadded:: 1.3.0
:returns: **bool** that's **True** if the connection is for the local host and **False** otherwise
"""
return self._socket.is_localhost()
def connection_time(self):
"""
Provides the unix timestamp for when our socket was either connected or
disconnected. That is to say, the time we connected if we're currently
connected and the time we disconnected if we're not connected.
.. versionadded:: 1.3.0
:returns: **float** for when we last connected or disconnected, zero if
we've never connected
"""
return self._socket.connection_time()
def is_authenticated(self):
"""
Checks if our socket is both connected and authenticated.
:returns: **bool** that's **True** if our socket is authenticated to tor
and **False** otherwise
"""
if self.is_alive():
return self._is_authenticated
return False
def connect(self):
"""
Reconnects our control socket. This is a pass-through for our socket's
:func:`~stem.socket.ControlSocket.connect` method.
:raises: :class:`stem.SocketError` if unable to make a socket
"""
self._socket.connect()
def close(self):
"""
Closes our socket connection. This is a pass-through for our socket's
:func:`~stem.socket.ControlSocket.close` method.
"""
self._socket.close()
# Join on any outstanding state change listeners. Closing is a state change
# of its own, so if we have any listeners it's quite likely there's some
# work in progress.
#
# It's important that we do this outside of our locks so those daemons have
# access to us. This is why we're doing this here rather than _close().
for t in self._state_change_threads:
if t.is_alive() and threading.current_thread() != t:
t.join()
def get_socket(self):
"""
Provides the socket used to speak with the tor process. Communicating with
the socket directly isn't advised since it may confuse this controller.
:returns: :class:`~stem.socket.ControlSocket` we're communicating with
"""
return self._socket
def get_latest_heartbeat(self):
"""
Provides the unix timestamp for when we last heard from tor. This is zero
if we've never received a message.
:returns: float for the unix timestamp of when we last heard from tor
"""
return self._last_heartbeat
def add_status_listener(self, callback, spawn = True):
"""
Notifies a given function when the state of our socket changes. Functions
are expected to be of the form...
::
my_function(controller, state, timestamp)
The state is a value from the :data:`stem.control.State` enum. Functions
**must** allow for new values. The timestamp is a float for the unix time
when the change occurred.
This class only provides **State.INIT** and **State.CLOSED** notifications.
Subclasses may provide others.
If spawn is **True** then the callback is notified via a new daemon thread.
If **False** then the notice is under our locks, within the thread where
the change occurred. In general this isn't advised, especially if your
callback could block for a while. If still outstanding these threads are
joined on as part of closing this controller.
:param function callback: function to be notified when our state changes
:param bool spawn: calls function via a new thread if **True**, otherwise
it's part of the connect/close method call
"""
with self._status_listeners_lock:
self._status_listeners.append((callback, spawn))
def remove_status_listener(self, callback):
"""
Stops listener from being notified of further events.
:param function callback: function to be removed from our listeners
:returns: **bool** that's **True** if we removed one or more occurrences of
the callback, **False** otherwise
"""
with self._status_listeners_lock:
new_listeners, is_changed = [], False
for listener, spawn in self._status_listeners:
if listener != callback:
new_listeners.append((listener, spawn))
else:
is_changed = True
self._status_listeners = new_listeners
return is_changed
def __enter__(self):
return self
def __exit__(self, exit_type, value, traceback):
self.close()
def _handle_event(self, event_message):
"""
Callback to be overwritten by subclasses for event listening. This is
notified whenever we receive an event from the control socket.
:param stem.response.ControlMessage event_message: message received from
the control socket
"""
pass
def _connect(self):
self._launch_threads()
self._notify_status_listeners(State.INIT)
self._socket_connect()
self._is_authenticated = False
def _close(self):
# Our is_alive() state is now false. Our reader thread should already be
# awake from recv() raising a closure exception. Wake up the event thread
# too so it can end.
self._event_notice.set()
self._is_authenticated = False
# joins on our threads if it's safe to do so
for t in (self._reader_thread, self._event_thread):
if t and t.is_alive() and threading.current_thread() != t:
t.join()
self._notify_status_listeners(State.CLOSED)
self._socket_close()
def _post_authentication(self):
# actions to be taken after we have a newly authenticated connection
self._is_authenticated = True
def _notify_status_listeners(self, state):
"""
Informs our status listeners that a state change occurred.
:param stem.control.State state: state change that has occurred
"""
# Any changes to our is_alive() state happen under the send lock, so we
# need to have it to ensure it doesn't change beneath us.
with self._socket._get_send_lock():
with self._status_listeners_lock:
# States imply that our socket is either alive or not, which may not
# hold true when multiple events occur in quick succession. For
# instance, a sighup could cause two events (State.RESET for the sighup
# and State.CLOSE if it causes tor to crash). However, there's no
# guarantee of the order in which they occur, and it would be bad if
# listeners got the State.RESET last, implying that we were alive.
expect_alive = None
if state in (State.INIT, State.RESET):
expect_alive = True
elif state == State.CLOSED:
expect_alive = False
change_timestamp = time.time()
if expect_alive is not None and expect_alive != self.is_alive():
return
self._state_change_threads = list(filter(lambda t: t.is_alive(), self._state_change_threads))
for listener, spawn in self._status_listeners:
if spawn:
name = '%s notification' % state
args = (self, state, change_timestamp)
notice_thread = threading.Thread(target = listener, args = args, name = name)
notice_thread.setDaemon(True)
notice_thread.start()
self._state_change_threads.append(notice_thread)
else:
listener(self, state, change_timestamp)
def _launch_threads(self):
"""
Initializes daemon threads. Threads can't be reused so we need to recreate
them if we're restarted.
"""
# In theory concurrent calls could result in multiple start() calls on a
# single thread, which would cause an unexpected exception. Best be safe.
with self._socket._get_send_lock():
if not self._reader_thread or not self._reader_thread.is_alive():
self._reader_thread = threading.Thread(target = self._reader_loop, name = 'Tor Listener')
self._reader_thread.setDaemon(True)
self._reader_thread.start()
if not self._event_thread or not self._event_thread.is_alive():
self._event_thread = threading.Thread(target = self._event_loop, name = 'Event Notifier')
self._event_thread.setDaemon(True)
self._event_thread.start()
def _reader_loop(self):
"""
Continually pulls from the control socket, directing the messages into
queues based on their type. Controller messages come in two varieties...
* Responses to messages we've sent (GETINFO, SETCONF, etc).
* Asynchronous events, identified by a status code of 650.
"""
while self.is_alive():
try:
control_message = self._socket.recv()
self._last_heartbeat = time.time()
if control_message.content()[-1][0] == '650':
# asynchronous message, adds to the event queue and wakes up its handler
self._event_queue.put(control_message)
self._event_notice.set()
else:
# response to a msg() call
self._reply_queue.put(control_message)
except stem.ControllerError as exc:
# Assume that all exceptions belong to the reader. This isn't always
# true, but the msg() call can do a better job of sorting it out.
#
# Be aware that the msg() method relies on this to unblock callers.
self._reply_queue.put(exc)
def _event_loop(self):
"""
Continually pulls messages from the _event_queue and sends them to our
handle_event callback. This is done via its own thread so subclasses with a
lengthy handle_event implementation don't block further reading from the
socket.
"""
while True:
try:
event_message = self._event_queue.get_nowait()
self._handle_event(event_message)
except queue.Empty:
if not self.is_alive():
break
self._event_notice.wait()
self._event_notice.clear()
class Controller(BaseController):
"""
Communicates with a control socket. This is built on top of the
BaseController and provides a more user friendly API for library users.
"""
@staticmethod
def from_port(address = '127.0.0.1', port = 9051):
"""
Constructs a :class:`~stem.socket.ControlPort` based Controller.
:param str address: ip address of the controller
:param int port: port number of the controller
:returns: :class:`~stem.control.Controller` attached to the given port
:raises: :class:`stem.SocketError` if we're unable to establish a connection
"""
if not stem.util.connection.is_valid_ipv4_address(address):
raise ValueError('Invalid IP address: %s' % address)
elif not stem.util.connection.is_valid_port(port):
raise ValueError('Invalid port: %s' % port)
control_port = stem.socket.ControlPort(address, port)
return Controller(control_port)
@staticmethod
def from_socket_file(path = '/var/run/tor/control'):
"""
Constructs a :class:`~stem.socket.ControlSocketFile` based Controller.
:param str path: path where the control socket is located
:returns: :class:`~stem.control.Controller` attached to the given socket file
:raises: :class:`stem.SocketError` if we're unable to establish a connection
"""
control_socket = stem.socket.ControlSocketFile(path)
return Controller(control_socket)
def __init__(self, control_socket, is_authenticated = False):
self._is_caching_enabled = True
self._request_cache = {}
self._last_newnym = 0.0
self._cache_lock = threading.RLock()
# mapping of event types to their listeners
self._event_listeners = {}
self._event_listeners_lock = threading.RLock()
# number of sequential 'GETINFO ip-to-country/*' lookups that have failed
self._geoip_failure_count = 0
self._enabled_features = []
super(Controller, self).__init__(control_socket, is_authenticated)
def _sighup_listener(event):
if event.signal == Signal.RELOAD:
self.clear_cache()
self._notify_status_listeners(State.RESET)
self.add_event_listener(_sighup_listener, EventType.SIGNAL)
def _confchanged_listener(event):
if self.is_caching_enabled():
self._set_cache(dict((k, None) for k in event.config), 'getconf')
if 'exitpolicy' in event.config.keys():
self._set_cache({'exitpolicy': None})
self.add_event_listener(_confchanged_listener, EventType.CONF_CHANGED)
def connect(self):
super(Controller, self).connect()
self.clear_cache()
def close(self):
# making a best-effort attempt to quit before detaching the socket
if self.is_alive():
try:
self.msg('QUIT')
except:
pass
self.clear_cache()
super(Controller, self).close()
def authenticate(self, *args, **kwargs):
"""
A convenience method to authenticate the controller. This is just a
pass-through to :func:`stem.connection.authenticate`.
"""
import stem.connection
stem.connection.authenticate(self, *args, **kwargs)
@with_default()
def get_info(self, params, default = UNDEFINED, get_bytes = False):
"""
get_info(params, default = UNDEFINED, get_bytes = False)
Queries the control socket for the given GETINFO option. If provided a
default then that's returned if the GETINFO option is undefined or the
call fails for any reason (error response, control port closed, initiated,