Merge pull request #3 from eBrnd/master

smoother visualization effects
This commit is contained in:
Sebastian 2015-09-24 23:30:06 +02:00
commit 6c4c8c54c3
1 changed files with 92 additions and 56 deletions

View File

@ -6,11 +6,13 @@ import random
import time import time
import sys import sys
from twinklclient import TwinklSocket, TwinklMessage
WIDTH = 6 WIDTH = 6
HEIGHT = 8 HEIGHT = 8
AUDIO_RATE = 4000 # sampling rate in Hz AUDIO_RATE = 16000 # sampling rate in Hz
WINDOW_SIZE = 8 # average fft over WINDOW_SIZE audio frames WINDOW_SIZE = 14 # average fft over WINDOW_SIZE audio frames
BOX_MAP = [ BOX_MAP = [
[357, 18, 369, 186, 249, 228, 51], [357, 18, 369, 186, 249, 228, 51],
@ -29,41 +31,14 @@ COLORS = [
] ]
channels = {}
def set_box(x,y,r,g,b):
if x >= 0 and y >= 0 and x < WIDTH and y < HEIGHT:
base_address = BOX_MAP[y][x]
channels[base_address] = r
channels[base_address + 1] = g
channels[base_address + 2] = b
def get_box(x, y):
base_address = BOX_MAP[y][x]
try:
res = [ channels[base_address], channels[base_address + 1], channels[base_address + 2] ]
except KeyError, e:
# If the array is still uninitialized, just return [0,0,0]
res = [ 0, 0, 0 ]
return res
def output_channels():
for channel, value in channels.items():
print("%d : %d" % (channel, value))
print("")
sys.stdout.flush()
class Background: class Background:
"""clear the light wall to a pseudorandomly changing/fading solid color""" """clear the light wall to a pseudorandomly changing/fading solid color"""
def __init__(self): def __init__(self, twinkl_output):
self._current_bg_color = [ 0, 0, 0 ] self._current_bg_color = [ 0, 0, 0 ]
self._target_bg_color = [ 128, 128, 128 ] self._target_bg_color = [ 128, 128, 128 ]
self._bg_time = 0 self._bg_time = 0
self._out = twinkl_output
def clear(self): def clear(self):
@ -86,65 +61,119 @@ class Background:
for x in range(WIDTH): for x in range(WIDTH):
for y in range(HEIGHT): for y in range(HEIGHT):
color = get_box(x, y) color = self._out.get_box(x, y)
if y != HEIGHT - 1: if y != HEIGHT - 1:
color_below = get_box(x, y + 1) color_below = self._out.get_box(x, y + 1)
else: else:
color_below = self._current_bg_color color_below = self._current_bg_color
for i in range(3): for i in range(3):
# fade into BG by adding just a little of the current BG color, # fade into BG by adding just a little of the current BG color,
# add color from pixel below for a "upward flowing" effect # add color from pixel below for a "upward flowing" effect
color[i] = 0.744 * color[i] + 0.056 * self._current_bg_color[i] + 0.2 * color_below[i] color[i] = int(0.744 * color[i] + 0.056 * self._current_bg_color[i] + 0.2 * color_below[i])
set_box(x, y, color[0], color[1], color[2]) self._out.set_box(x, y, color[0], color[1], color[2])
def audio_from_raw(raw): def audio_from_raw(raw):
"""convert bytewise signed 16bit little endian to int list""" """convert bytewise signed 16bit little endian to int list, only take one stereo channel"""
out = [] out = []
high = False sample_byte = 0
current = 0 current = 0
for value in raw: for value in raw:
value = ord(value[0]) value = ord(value[0])
if high: if sample_byte == 0:
current = value
sample_byte = 1
elif sample_byte == 1:
sign = value & 0x80 sign = value & 0x80
current = current + 256 * (value & 0x7F) current = current + 256 * (value & 0x7F)
if sign: if sign:
current = -((~current & 0x7FFF) + 1) current = -((~current & 0x7FFF) + 1)
out.append(current) out.append(current)
high = False sample_byte = 2
elif sample_byte == 2: # skip other stereo channel
sample_byte = 3
else: else:
current = value sample_byte = 0
high = True
return out return out
class Twinkl_output:
"""Wrapper class for TwinklSocket/TwinklMessage that keeps a frame buffer which allows write and
read access and takes care of mapping x/y coordinates to boxes"""
def __init__(self, hostname, port, priority):
self._socket = TwinklSocket(hostname, port)
self._msg = TwinklMessage()
self._msg.set_priority(int(priority))
self._channels = {}
def set_box(self, x,y,r,g,b):
if x >= 0 and y >= 0 and x < WIDTH and y < HEIGHT:
base_address = BOX_MAP[y][x]
self._channels[base_address] = r
self._channels[base_address + 1] = g
self._channels[base_address + 2] = b
def get_box(self, x, y):
base_address = BOX_MAP[y][x]
try:
res = [
self._channels[base_address],
self._channels[base_address + 1],
self._channels[base_address + 2]
]
except KeyError, e:
# If the array is still uninitialized, just return [0,0,0]
res = [ 0, 0, 0 ]
return res
def send(self):
for _, i in enumerate(self._channels):
self._msg[i] = self._channels[i]
self._socket.send(self._msg)
class Fft_output: class Fft_output:
"""aggregate several fft'ed samples, does postprocessing to make it look nice and outputs """aggregate several fft'ed samples, does postprocessing to make it look nice and outputs
the aggregated result""" the aggregated result"""
def __init__(self, width, height, windowsize): def __init__(self, width, height, windowsize, twinkl_output):
self._background = Background() self._background = Background(twinkl_output)
self._width = width self._width = width
self._height = height self._height = height
self._windowsize = windowsize self._windowsize = windowsize
self._out = twinkl_output
self._count = 0 self._count = 0
self._data = [] self._data = [0] * width
for _ in range(width):
self._data.append(0)
def add(self, data): def add(self, data):
"""add a set of fft data to the internal store, output the result if enough data is """add a set of fft data to the internal store, output the result if enough data is
available""" available"""
abss = numpy.absolute(data[1:self._width+1])
binsize = 2
abss = numpy.absolute(data[0:binsize * self._width])
# sum fft values into bins
absum = [0] * self._width
for i in range(1, self._width * binsize):
absum[i / binsize] = absum[i / binsize] + abss[i]
for i in range(self._width): for i in range(self._width):
self._data[i] = self._data[i] + abss[i] self._data[i] = self._data[i] + absum[i]
self._count = self._count + 1 self._count = self._count + 1
if (self._count == self._windowsize): if (self._count == self._windowsize):
@ -157,18 +186,18 @@ class Fft_output:
def output_twinkl(self): def output_twinkl(self):
"""output graph to twinkl client""" """output graph to twinkl client"""
# correct for disproportionately large first and second column # correct for disproportionately large first and second column
self._data[0] = self._data[0] / 2.5 self._data[0] = self._data[0] / 2
self._data[1] = self._data[1] / 1.5 self._data[1] = self._data[1] / 2.5
self._data[2] = self._data[2] / 1.5
abss = numpy.absolute(self._data)
self._background.clear() self._background.clear()
for col in range(self._width): for col in range(self._width):
normalized = min(int(abss[col] / (self._height * self._windowsize * 1000)), self._height) normalized = min(int(self._data[col] / (self._height * self._windowsize * 700)), self._height)
color = COLORS[normalized-1] color = COLORS[normalized-1]
for row in range(self._height - normalized, self._height): for row in range(self._height - normalized, self._height):
set_box(col, row, color[0], color[1], color[2]) self._out.set_box(col, row, color[0], color[1], color[2])
output_channels()
self._out.send()
def init_audio(rate): def init_audio(rate):
@ -176,13 +205,20 @@ def init_audio(rate):
ain = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL) ain = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL)
ain.setformat(alsaaudio.PCM_FORMAT_S16_LE) ain.setformat(alsaaudio.PCM_FORMAT_S16_LE)
ain.setrate(rate) ain.setrate(rate)
ain.setperiodsize(64)
return ain return ain
def main(): def main():
if len(sys.argv) != 3:
print "Usage: %s host priority" % sys.argv[0]
sys.exit(1)
output = Twinkl_output(sys.argv[1], "1337", sys.argv[2])
ain = init_audio(AUDIO_RATE) ain = init_audio(AUDIO_RATE)
fft_out = Fft_output(WIDTH, HEIGHT, WINDOW_SIZE) fft_out = Fft_output(WIDTH, HEIGHT, WINDOW_SIZE, output)
while True: while True:
data = ain.read(); data = ain.read();