| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- from __future__ import print_function, absolute_import, division
- print('receiver start')
- import sys
- import time
- import socket
- import traceback
- from path import Path
- from rgbmatrix import RGBMatrix, RGBMatrixOptions
- from rgbmatrix import graphics
- from PIL import Image
- def asc_desc_modulation(x, interval, clamp=0.1):
- sub_idx = x % interval
- if sub_idx < interval / 2:
- f = (x % interval) / interval
- else:
- f = (interval - x % interval) / interval
- f = max(clamp, f)
- modulate = lambda c: (int(c[0] * f), int(c[1] * f), int(c[2] * f))
- return modulate
- def Color(colorstring):
- """ convert #RRGGBB to an (R, G, B) tuple """
- colorstring = colorstring.strip()
- if colorstring[0] == '#': colorstring = colorstring[1:]
- if len(colorstring) != 6:
- raise ValueError, "input #%s is not in #RRGGBB format" % colorstring
- r, g, b = colorstring[:2], colorstring[2:4], colorstring[4:]
- r, g, b = [int(n, 16) for n in (r, g, b)]
- return (r, g, b)
- class Renderer(object):
- def __init__(self, textColor=(255, 255, 255), brightness=255):
- print('initializing renderer')
- self._color = textColor
- self.setup()
- self._brightness = brightness
- self.set_brightness(brightness)
- def set_brightness(self, value):
- self._brightness = value
- self.matrix.brightness = value
- def setup(self):
- o = RGBMatrixOptions()
- o.chain_length = 4
- self.matrix = RGBMatrix(options=o)
- self.canvas = self.matrix.CreateFrameCanvas()
- self.color = graphics.Color(*self._color)
- def destroy(self):
- del self.canvas
- del self.matrix
- del self.color
- def swap(self):
- self.canvas = self.matrix.SwapOnVSync(self.canvas)
- self.canvas.Clear()
- def progress(self, step=0, total=0):
- c = self.canvas
- w, h = c.width, c.height
- col = Color('#AACCFF')
- if total > 0:
- padding = ' ' * (len(str(total-1)) - len(str(step+1)))
- s_len = graphics.DrawText(c, fonts['10x20'], 0, 20, self.color, '%s%s' % (padding, step+1))
- w = w - s_len
- stepsize = max(1, int(w / total))
- for x in range(w):
- modulate = asc_desc_modulation(x, stepsize)
- for y in range(h):
- c.SetPixel(s_len + x, y, *modulate(col))
- if x > ((step + 1) / total) * w:
- break
- self.swap()
- def print_text(self, text):
- graphics.DrawText(self.canvas, fonts['10x20'], 0, 20, self.color, text)
- self.swap()
- def scroll_text(self, text, loop=1):
- pos = self.canvas.width
- i = 1
- while True:
- len = graphics.DrawText(self.canvas, fonts['10x20'], pos, 20, self.color, text)
- pos -= 1
- if (pos + len < 0):
- pos = self.canvas.width
- i += 1
- if i > loop:
- break
- time.sleep(0.02)
- self.swap()
- def fit_text(self, text, width=None, fade=None):
- if width is None:
- width = self.canvas.width
- ff = reversed(sorted(fonts.items(), key=lambda e: int(e[0].split('x')[0])))
- temp_canvas = self.matrix.CreateFrameCanvas()
- for name, font in ff:
- w = graphics.DrawText(temp_canvas, font, 0, -100, self.color, text)
- if w < width:
- break
- else:
- print('str len %s exceeds space' % w)
- if fade:
- t = time.time()
- while time.time() < t + fade:
- x = 1 - ((time.time() - t) / fade)
- #~ self.matrix.brightness = self._brightness * x
- c = graphics.Color(int(self.color.red * x), int(self.color.green * x), int(self.color.blue * x))
- graphics.DrawText(self.canvas, font, 0, 20, c, text)
- self.swap()
- time.sleep(0.02)
- else:
- graphics.DrawText(self.canvas, font, 0, 20, self.color, text)
- self.swap()
- try:
- fonts = {}
- fo = graphics.Font()
- fo.LoadFont('/matrix/fonts/10x20.bdf')
- fonts['10x20'] = fo
- r = Renderer(textColor=(96, 128, 255))
- print('loading fonts')
- files = Path('/matrix/fonts').files('*.bdf')
- for i, f in enumerate(files):
- #~ if i % 3 != 0:
- #~ continue
- r.progress(i, len(files))
- if not f.namebase.split('x')[0].isdigit() or not f.namebase[-1].isdigit():
- continue
- fo = graphics.Font()
- fo.LoadFont(f)
- fonts[f.namebase] = fo
- r.progress(0, 0)
- # Create a TCP/IP socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # Bind the socket to the port
- server_address = ('0.0.0.0', 10000)
- print('starting up on %s port %s' % server_address)
- sock.bind(server_address)
- r.fit_text('Matrix ready.', fade=4)
- while True:
- try:
- print('waiting to receive message')
- data, address = sock.recvfrom(4096)
- method = ord(data[0])
- print('received %s bytes from %s [method: %s]' % (len(data), address, method))
- r.set_brightness(255)
- data = data[1:]
- if method == 0:
- r.print_text(data.decode('utf-8'))
- elif method == 1:
- r.scroll_text(data.decode('utf-8'))
- elif method == 2:
- r.fit_text(data.decode('utf-8'))
- elif method == 10:
- r.progress(ord(data[0]), ord(data[1]))
- elif method == 255:
- # kill SwapOnVSync
- r.destroy()
- # XXX: how to re-setup
- #~ r.setup()
- except Exception:
- print(traceback.format_exc())
- except Exception:
- print(traceback.format_exc())
- finally:
- print('receiver exit')
|