Skip to content

Commit

Permalink
Merge pull request jiaaro#172 from bolaum/master
Browse files Browse the repository at this point in the history
Add DC offset related functionality and fixes for win32.
  • Loading branch information
jiaaro authored Feb 3, 2017
2 parents a2392f8 + 21b5bbc commit 917677d
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 5 deletions.
21 changes: 21 additions & 0 deletions API.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,27 @@ samples = sound.get_array_of_samples()
new_sound = sound._spawn(samples)
```

### AudioSegment(…).get_dc_offset()

Returns a value between -1.0 and 1.0 representing the DC offset of a channel. This is calculated using `audioop.avg()` and normalizing the result by samples max value.

**Supported keyword arguments**:

- `channel` | example: `2` | default: `1`
Selects left (1) or right (2) channel to calculate DC offset. If segment is mono, this value is ignored.

### AudioSegment(…).remove_dc_offset()

Removes DC offset from channel(s). This is done by using `audioop.bias()`, so watch out for overflows.

**Supported keyword arguments**:

- `channel` | example: `2` | default: None
Selects left (1) or right (2) channel remove DC offset. If value if None, removes from all available channels. If segment is mono, this value is ignored.

- `offset` | example: `-0.1` | default: None
Offset to be removed from channel(s). Calculates offset if it's None. Offset values must be between -1.0 and 1.0.

## Effects

Collection of DSP effects that are implemented by `AudioSegment` objects.
Expand Down
3 changes: 3 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ Michael Bortnyck

André Cloete
github: aj-cloete

Thiago Abdnur
github: bolaum
55 changes: 55 additions & 0 deletions pydub/audio_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,61 @@ def max_dBFS(self):
def duration_seconds(self):
return self.frame_rate and self.frame_count() / self.frame_rate or 0.0

def get_dc_offset(self, channel=1):
"""
Returns a value between -1.0 and 1.0 representing the DC offset of a
channel (1 for left, 2 for right).
"""
if not 1 <= channel <= 2:
raise ValueError("channel value must be 1 (left) or 2 (right)")

if self.channels == 1:
data = self._data
elif channel == 1:
data = audioop.tomono(self._data, self.sample_width, 1, 0)
else:
data = audioop.tomono(self._data, self.sample_width, 0, 1)

return float(audioop.avg(data, self.sample_width)) / self.max_possible_amplitude

def remove_dc_offset(self, channel=None, offset=None):
"""
Removes DC offset of given channel. Calculates offset if it's not given.
Offset values must be in range -1.0 to 1.0. If channel is None, removes
DC offset from all available channels.
"""
if channel and not 1 <= channel <= 2:
raise ValueError("channel value must be None, 1 (left) or 2 (right)")

if offset and not -1.0 <= offset <= 1.0:
raise ValueError("offset value must be in range -1.0 to 1.0")

if offset:
offset = int(round(offset * self.max_possible_amplitude))

def remove_data_dc(data, off):
if not off:
off = audioop.avg(data, self.sample_width)
return audioop.bias(data, self.sample_width, -off)

if self.channels == 1:
return self._spawn(data=remove_data_dc(self._data, offset))

left_channel = audioop.tomono(self._data, self.sample_width, 1, 0)
right_channel = audioop.tomono(self._data, self.sample_width, 0, 1)

if not channel or channel == 1:
left_channel = remove_data_dc(left_channel, offset)

if not channel or channel == 2:
right_channel = remove_data_dc(right_channel, offset)

left_channel = audioop.tostereo(left_channel, self.sample_width, 1, 0)
right_channel = audioop.tostereo(right_channel, self.sample_width, 0, 1)

return self._spawn(data=audioop.add(left_channel, right_channel,
self.sample_width))

def apply_gain(self, volume_change):
return self._spawn(data=audioop.mul(self._data, self.sample_width,
db_to_float(float(volume_change))))
Expand Down
4 changes: 4 additions & 0 deletions pydub/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ def mediainfo(filepath):

rgx = re.compile(r"(?:(?P<inner_dict>.*?):)?(?P<key>.*?)\=(?P<value>.*?)$")
info = {}

if sys.platform == 'win32':
output = output.replace("\r", "")

for line in output.split("\n"):
# print(line)
mobj = rgx.match(line)
Expand Down
Binary file added test/data/test-dc_offset.wav
Binary file not shown.
65 changes: 60 additions & 5 deletions test/test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from functools import partial
import mimetypes
import os
import sys
import unittest
from tempfile import NamedTemporaryFile
import struct
Expand Down Expand Up @@ -66,24 +67,27 @@ def test_audio_segment_from_mp3(self):
self.assertTrue(len(seg1) > 0)


test1wav = test4wav = test1 = test2 = test3 = testparty = None
test1wav = test4wav = test1 = test2 = test3 = testparty = testdcoffset = None


class AudioSegmentTests(unittest.TestCase):

def setUp(self):
global test1, test2, test3, testparty
global test1, test2, test3, testparty, testdcoffset
if not test1:
test1 = AudioSegment.from_mp3(os.path.join(data_dir, 'test1.mp3'))
test2 = AudioSegment.from_mp3(os.path.join(data_dir, 'test2.mp3'))
test3 = AudioSegment.from_mp3(os.path.join(data_dir, 'test3.mp3'))
testdcoffset = AudioSegment.from_mp3(
os.path.join(data_dir, 'test-dc_offset.wav'))
testparty = AudioSegment.from_mp3(
os.path.join(data_dir, 'party.mp3'))

self.seg1 = test1
self.seg2 = test2
self.seg3 = test3
self.mp3_seg_party = testparty
self.seg_dc_offset = testdcoffset

self.ogg_file_path = os.path.join(data_dir, 'bach.ogg')
self.mp4_file_path = os.path.join(data_dir, 'creative_common.mp4')
Expand Down Expand Up @@ -406,11 +410,16 @@ def test_export_forced_codec(self):
seg = self.seg1 + self.seg2

with NamedTemporaryFile('w+b', suffix='.ogg') as tmp_file:
if sys.platform == 'win32':
tmp_file.close()

seg.export(tmp_file.name, 'ogg', codec='libvorbis')
exported = AudioSegment.from_ogg(tmp_file.name)
self.assertWithinTolerance(len(exported),
len(seg),
percentage=0.01)
if sys.platform == 'win32':
os.remove(tmp_file.name)

def test_fades(self):
seg = self.seg1[:10000]
Expand Down Expand Up @@ -474,6 +483,9 @@ def test_normalize(self):
def test_for_accidental_shortening(self):
seg = self.mp3_seg_party
with NamedTemporaryFile('w+b', suffix='.mp3') as tmp_mp3_file:
if sys.platform == 'win32':
tmp_mp3_file.close()

seg.export(tmp_mp3_file.name)

for i in range(3):
Expand All @@ -482,6 +494,9 @@ def test_for_accidental_shortening(self):
tmp_seg = AudioSegment.from_mp3(tmp_mp3_file.name)
self.assertFalse(len(tmp_seg) < len(seg))

if sys.platform == 'win32':
os.remove(tmp_mp3_file.name)

def test_formats(self):
seg_m4a = AudioSegment.from_file(
os.path.join(data_dir, 'format_test.m4a'), "m4a")
Expand Down Expand Up @@ -543,7 +558,7 @@ def test_export_mp4_as_wav(self):
AudioSegment.from_file(self.mp4_file_path).export(tmp_wav_file,
format="mp3")
tmp_file_type, _ = mimetypes.guess_type(tmp_wav_file.name)
self.assertEqual(tmp_file_type, 'audio/x-wav')
self.assertEqual(tmp_file_type in ['audio/x-wav', 'audio/wav'], True)

def test_export_mp4_as_mp3_with_tags(self):
with NamedTemporaryFile('w+b', suffix='.mp3') as tmp_mp3_file:
Expand Down Expand Up @@ -581,15 +596,23 @@ def test_export_mp4_as_mp3_with_tags_raises_exception_when_id3version_is_wrong(s
def test_export_mp3_with_tags(self):
tags = {'artist': 'Mozart', 'title': 'The Magic Flute'}

with NamedTemporaryFile('w+b', suffix='.mp3') as tmp_mp3_file:
delete = sys.platform != 'win32'

with NamedTemporaryFile('w+b', suffix='.mp3', delete=delete) as tmp_mp3_file:
AudioSegment.from_file(self.mp4_file_path).export(tmp_mp3_file, format="mp3", tags=tags)

if sys.platform == 'win32':
tmp_mp3_file.close()

info = mediainfo(filepath=tmp_mp3_file.name)
info_tags = info["TAG"]

self.assertEqual(info_tags["artist"], "Mozart")
self.assertEqual(info_tags["title"], "The Magic Flute")

if sys.platform == 'win32':
os.remove(tmp_mp3_file.name)

def test_fade_raises_exception_when_duration_start_end_are_none(self):
seg = self.seg1
func = partial(seg.fade, start=1, end=1, duration=1)
Expand Down Expand Up @@ -661,11 +684,19 @@ def test_compress(self):
self.assertTrue(compressed.rms < self.seg1.rms)

def test_exporting_to_ogg_uses_default_codec_when_codec_param_is_none(self):
with NamedTemporaryFile('w+b', suffix='.ogg') as tmp_ogg_file:
delete = sys.platform != 'win32'

with NamedTemporaryFile('w+b', suffix='.ogg', delete=delete) as tmp_ogg_file:
AudioSegment.from_file(self.mp4_file_path).export(tmp_ogg_file, format="ogg")

if sys.platform == 'win32':
tmp_ogg_file.close()

info = mediainfo(filepath=tmp_ogg_file.name)

if sys.platform == 'win32':
os.remove(tmp_ogg_file.name)

self.assertEqual(info["codec_name"], "vorbis")
self.assertEqual(info["format_name"], "ogg")

Expand Down Expand Up @@ -703,6 +734,30 @@ def test_sample_array(self):
[0, 2099, 4190, 6263, 8311, 10325, 12296, 14217]
)

def test_get_dc_offset(self):
seg = self.seg_dc_offset
self.assertWithinTolerance(seg.get_dc_offset(), -0.16, tolerance=0.01)
self.assertWithinTolerance(seg.get_dc_offset(1), -0.16, tolerance=0.01)
self.assertWithinTolerance(seg.get_dc_offset(2), 0.1, tolerance=0.01)

def test_remove_dc_offset(self):
seg = self.seg_dc_offset

seg1 = seg.remove_dc_offset()
self.assertWithinTolerance(seg1.get_dc_offset(1), 0.0, tolerance=0.0001)
self.assertWithinTolerance(seg1.get_dc_offset(2), 0.0, tolerance=0.0001)

seg1 = seg.remove_dc_offset(1)
self.assertWithinTolerance(seg1.get_dc_offset(1), 0.0, tolerance=0.0001)
self.assertWithinTolerance(seg1.get_dc_offset(2), 0.1, tolerance=0.01)

seg1 = seg.remove_dc_offset(2)
self.assertWithinTolerance(seg1.get_dc_offset(1), -0.16, tolerance=0.01)
self.assertWithinTolerance(seg1.get_dc_offset(2), 0.0, tolerance=0.0001)

seg1 = seg.remove_dc_offset(channel=1, offset=(-0.06))
self.assertWithinTolerance(seg1.get_dc_offset(1), -0.1, tolerance=0.01)


class SilenceTests(unittest.TestCase):

Expand Down

0 comments on commit 917677d

Please sign in to comment.