websockets
is a library for building WebSocket servers and clients in
Python with a focus on correctness and simplicity.
Built on top of asyncio
, Python's standard asynchronous I/O framework, it
provides an elegant coroutine-based API.
Documentation is available on Read the Docs.
Here's how a client sends and receives messages:
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
await websocket.recv()
asyncio.get_event_loop().run_until_complete(
hello('ws://localhost:8765'))
And here's an echo server:
#!/usr/bin/env python
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
await websocket.send(message)
asyncio.get_event_loop().run_until_complete(
websockets.serve(echo, 'localhost', 8765))
asyncio.get_event_loop().run_forever()
Does that look good?
Get started with the tutorial!
Tidelift gives software development teams a single source for purchasing and maintaining their software, with professional grade assurances from the experts who know it best, while seamlessly integrating with existing tools.
Get supported websockets with the Tidelift Subscription
(If you contribute to ``websockets`` and would like to become an official support provider, let me know.)
The development of websockets
is shaped by four principles:
- Simplicity: all you need to understand is
msg = await ws.recv()
andawait ws.send(msg)
;websockets
takes care of managing connections so you can focus on your application. - Robustness:
websockets
is built for production; for example it was the only library to handle backpressure correctly before the issue became widely known in the Python community. - Quality:
websockets
is heavily tested. Continuous integration fails under 100% branch coverage. Also it passes the industry-standard Autobahn Testsuite. - Performance: memory use is configurable. An extension written in C accelerates expensive operations. It's pre-compiled for Linux, macOS and Windows and packaged in the wheel format for each system and Python version.
Documentation is a first class concern in the project. Head over to Read the Docs and see for yourself.
This fork merges the proxy support work done in python-websockets#422 , adapted for Python 3.7. It also adds support for a proxy_headers argument that can be used to deal with proxy authentication requirements.
NTLM is a challenge-response authentication protocol
SSPI is a way to get the response using a Window's user's current login (without needing to know their password)
To deal with a proxy requiring NTLM authentication, only when NTLM authentication is needed, you can catch the 407 "Proxy Authentication Required" exception and then do the NTLM authentication, using SSPI, to get the value to use for proxy_headers
try:
self.socket = await websockets.connect(url, proxy_headers=self.proxy_headers)
except ValueError as e:
if "407" in str(e):
self.proxy_headers = await get_proxy_auth_header_sspi(self.sfp.get_session(), os.environ['HTTPS_PROXY'] if url.startswith("wss") else os.environ['HTTP_PROXY'])
# headers are returned in a name-value dictionary but websockets use list of tuples so convert..
self.proxy_headers = list(self.proxy_headers.items())
self.socket = await websockets.connect(url, proxy_headers=self.proxy_headers)
else:
raise
The aio_proxy_sspi_auth function is provided below. It's a work in progress and doesn't belong inside the websockets package, because it's something that should be used when making requests via aiohttp too (see below). Also, you can see that this is something that just works for a specific use case (NTLM SSPI, not Kerberos, not username/password) so I don't feel it's generic enough to suggest adding to aiohttp at this stage. Use at own risk :)
import base64
import hashlib
import logging
import socket
import struct
import pywintypes
import sspi
import sspicon
import win32security
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
_logger = logging.getLogger(__name__)
async def get_proxy_auth_header_sspi(session, proxy_url, peercert = None, delegate=False, host=None):
"""Performs a GET request against the proxy server to start and complete an NTLM authentication process
Invoke this after getting a 407 error. Returns the proxy_headers to use going forwards (in dict format)
Overview of the protocol/exchange: https://docs.microsoft.com/en-us/openspecs/office_protocols/ms-grvhenc/b9e676e7-e787-4020-9840-7cfe7c76044a
Inspired by: https://github.com/brandond/requests-negotiate-sspi/blob/master/requests_negotiate_sspi/requests_negotiate_sspi.py
(But this is async, and it's for proxy auth not normal www auth)
"""
scheme = 'NTLM'
host = None
if host is None:
targeturl = urlparse(proxy_url)
host= targeturl.hostname
try:
host= socket.getaddrinfo(host, None, 0, 0, 0, socket.AI_CANONNAME)[0][3]
except socket.gaierror as e:
_logger.info('Skipping canonicalization of name %s due to error: %s', host, e)
targetspn = '{}/{}'.format("HTTP", host)
# Set up SSPI connection structure
pkg_info = win32security.QuerySecurityPackageInfo(scheme)
clientauth = sspi.ClientAuth(scheme, targetspn=targetspn)#, auth_info=self._auth_info)
sec_buffer = win32security.PySecBufferDescType()
# Calling sspi.ClientAuth with scflags set requires you to specify all the flags, including defaults.
# We just want to add ISC_REQ_DELEGATE.
#if delegate:
# clientauth.scflags |= sspicon.ISC_REQ_DELEGATE
# Channel Binding Hash (aka Extended Protection for Authentication)
# If this is a SSL connection, we need to hash the peer certificate, prepend the RFC5929 channel binding type,
# and stuff it into a SEC_CHANNEL_BINDINGS structure.
# This should be sent along in the initial handshake or Kerberos auth will fail.
if peercert is not None:
md = hashlib.sha256()
md.update(peercert)
appdata = 'tls-server-end-point:'.encode('ASCII')+md.digest()
cbtbuf = win32security.PySecBufferType(pkg_info['MaxToken'], sspicon.SECBUFFER_CHANNEL_BINDINGS)
cbtbuf.Buffer = struct.pack('LLLLLLLL{}s'.format(len(appdata)), 0, 0, 0, 0, 0, 0, len(appdata), 32, appdata)
sec_buffer.append(cbtbuf)
# Send initial challenge auth header
try:
error, auth = clientauth.authorize(sec_buffer)
headers = {'Proxy-Authorization': f'{scheme} {base64.b64encode(auth[0].Buffer).decode("ASCII")}'}
response2 = await session.get(proxy_url, headers=headers)
_logger.debug('Got response: ' + str(response2))
#Sending Initial Context Token - error={} authenticated={}'.format(error, clientauth.authenticated))
except pywintypes.error as e:
_logger.debug('Error calling {}: {}'.format(e[1], e[2]), exc_info=e)
raise
# expect to get 407 error and proxy-authenticate header
if response2.status != 407:
raise Exception(f'Expected 407, got {res.status} status code')
# Extract challenge message from server
challenge = [val[len(scheme)+1:] for val in response2.headers.get('proxy-Authenticate', '').split(', ') if scheme in val]
if len(challenge) != 1:
raise Exception('Did not get exactly one {} challenge from server.'.format(scheme))
# Add challenge to security buffer
tokenbuf = win32security.PySecBufferType(pkg_info['MaxToken'], sspicon.SECBUFFER_TOKEN)
tokenbuf.Buffer = base64.b64decode(challenge[0])
sec_buffer.append(tokenbuf)
_logger.debug('Got Challenge Token (NTLM)')
# Perform next authorization step
try:
error, auth = clientauth.authorize(sec_buffer)
headers = {'proxy-Authorization': '{} {}'.format(scheme, base64.b64encode(auth[0].Buffer).decode('ASCII'))}
_logger.debug(str(headers))
except pywintypes.error as e:
_logger.debug('Error calling {}: {}'.format(e[1], e[2]), exc_info=e)
raise
return headers
Corporate proxies are often automatically configured using a PAC approach, so you can use pypac to get that and store the result in the environ variables, which are picked up by aiohttp if you set trust_env to true
if auto_proxy_config:
import pypac
pac = pypac.get_pac()
if pac:
resolver = pypac.resolver.ProxyResolver(pac)
proxies = resolver.get_proxy_for_requests(url)
os.environ['HTTP_PROXY'] = proxies.get('http') or ''
os.environ['HTTPS_PROXY'] = proxies.get('https') or ''
logger.info(f"Proxy Auto Config: HTTP:{os.environ['HTTP_PROXY']} HTTPS:{os.environ['HTTPS_PROXY']}")
Lastly, if you also have to do normal web requests and not just websockets, you need a similar 407 challenge response handler when doing such requests:
def get_session(self):
if not hasattr(self, 'session'):
# trust_env means read HTTPS_PROXY from environment
self.session = ClientSession(trust_env=True)
return self.session
#... and then when you need to do a request
try:
res = await self.session.post(url, json=body, proxy_headers=self.proxy_headers)
except ClientHttpProxyError as e:
if e.status == 407:
logger.info("Proxy 407 error occurred - starting proxy NTLM auth negotiation")
self.proxy_headers = await get_proxy_auth_header_sspi(self.session, os.environ['HTTPS_PROXY'] if self.url.startswith("https") else os.environ['HTTP_PROXY'])
res = await self.session.post(self.url, json=body, proxy_headers=self.proxy_headers)
else:
raise
- If you prefer callbacks over coroutines:
websockets
was created to provide the best coroutine-based API to manage WebSocket connections in Python. Pick another library for a callback-based API. - If you're looking for a mixed HTTP / WebSocket library:
websockets
aims at being an excellent implementation of RFC 6455: The WebSocket Protocol and RFC 7692: Compression Extensions for WebSocket. Its support for HTTP is minimal — just enough for a HTTP health check. - If you want to use Python 2:
websockets
builds uponasyncio
which only works on Python 3.websockets
requires Python ≥ 3.6.1.
Bug reports, patches and suggestions are welcome!
To report a security vulnerability, please use the Tidelift security contact. Tidelift will coordinate the fix and disclosure.
For anything else, please open an issue or send a pull request.
Participants must uphold the Contributor Covenant code of conduct.
websockets
is released under the BSD license.