| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- # -*- coding: utf-8 -*-
- from __future__ import print_function, absolute_import, division
- import socket
- from struct import pack, unpack
- from functools import wraps
- from collections import namedtuple
- DECODER = {}
- ENCODER = {}
- def decode(data):
- format, make = DECODER[ord(data[0])]
- args = tuple([e if not isinstance(e, str) else e.decode('utf-8').rstrip('\0') for e in unpack(format, data[1:])])
- return make(args)
- class Sender(object):
- def __init__(self, host, port):
- self.host = host
- self.port = port
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- def send(encoder):
- def _send(*args, **kwargs):
- data = encoder(*args, **kwargs)
- print('sending %s bytes' % len(data))
- self.sock.sendto(data, (self.host, self.port))
- return _send
- for name, f in ENCODER.iteritems():
- setattr(self, name, send(f))
- def matrix_api(api_index, format, names):
- assert api_index < 256
- def wrap(func):
- DECODER[api_index] = (format, namedtuple(func.__name__, names)._make)
- @wraps(func)
- def w(*args, **kwargs):
- pack_args = func(*args, **kwargs)
- if pack_args is None:
- assert not format and not names
- return chr(api_index)
- else:
- if isinstance(pack_args, (basestring, int, float)):
- pack_args = (pack_args, )
- #~ print(format, pack_args, pack(format, *pack_args))
- return chr(api_index) + pack(format, *pack_args)
- ENCODER[func.__name__] = w
- return w
- return wrap
- ## API Decls
- @matrix_api(1, '500s', 'text')
- def print_text(text):
- '''The maximum safe UDP payload is 508 bytes'''
- return text.encode('utf-8')
- @matrix_api(2, '500sh', 'text loop')
- def scroll_text(text, loop=1):
- return text.encode('utf-8'), loop
- @matrix_api(3, '500shh', 'text width fade')
- def fit_text(text, width=0, fade=0):
- return text.encode('utf-8'), width, fade
- @matrix_api(10, '32shh', 'step total')
- def progress(text, id, step=0, total=0):
- return id.encode('utf-8'), step, total
- @matrix_api(255, '', '')
- def destroy():
- return None
- if __name__ == '__main__':
- print(repr(print_text(u'Christine ♥')))
- #~ print(decode(print_text(u'Christine ♥')).text.encode('utf-8'))
- #~ print(decode(print_text('Christine ♥'))._asdict())
- #~ print(type(decode(print_text('Christine ♥'))).__name__)
- sender = Sender('waldbeere.fritz.box', 10000)
- sender.print_text(u'Christine ♥')
- sender.destroy()
|