PK btCG4O 4O mmstats-latest/history.html
Copyright 2012 Urban Airship and contributors
m | ||
mmstats | ||
mmstats._mmap | ||
mmstats.defaults | ||
mmstats.fields | ||
mmstats.models | ||
mmstats.reader |
Documentation | Package | Code
Not under active development
Mmstats is a way to expose and read diagnostic values and metrics for applications.
Think of mmstats as /proc for your application and the readers as procps utilities.
This project is a Python implementation, but compatible implementations can be made in any language (see Goals).
Discuss at https://groups.google.com/group/python-introspection
CPython 2.6 or 2.7 (Windows is untested)
PyPy (only tested in 1.7, should be faster in 1.8)
import mmstats
class WebStats(mmstats.MmStats):
status2xx = mmstats.CounterField(label='status.2XX')
status3xx = mmstats.CounterField(label='status.3XX')
status4xx = mmstats.CounterField(label='status.4XX')
status5xx = mmstats.CounterField(label='status.5XX')
last_hit = mmstats.DoubleField(label='timers.last_hit')
webstats = WebStats(label_prefix='web.stats.')
if response.status_code == 200:
webstats.status2xx.inc()
webstats.last_hit = time.time()
Mmstats is a way to expose and read diagnostic values and metrics for applications.
Think of mmstats as /proc for your application and the readers as procps utilities.
This project is a Python implementation, but compatible implementations can be made in any language (see Goals).
Discuss at https://groups.google.com/group/python-introspection
CPython 2.6 or 2.7 (Windows is untested)
PyPy (only tested in 1.7, should be faster in 1.8)
import mmstats
class WebStats(mmstats.MmStats):
status2xx = mmstats.CounterField(label='status.2XX')
status3xx = mmstats.CounterField(label='status.3XX')
status4xx = mmstats.CounterField(label='status.4XX')
status5xx = mmstats.CounterField(label='status.5XX')
last_hit = mmstats.DoubleField(label='timers.last_hit')
webstats = WebStats(label_prefix='web.stats.')
if response.status_code == 200:
webstats.status2xx.inc()
webstats.last_hit = time.time()
import collections
import ctypes
import ctypes.util
import errno
libc = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
import mmap as stdlib_mmap
import os
PAGESIZE = stdlib_mmap.PAGESIZE
MmapInfo = collections.namedtuple('MmapInfo', ('fd', 'size', 'pointer'))
[docs]def init_mmap(filename, size=PAGESIZE):
"""Create an mmap given a location `filename` and minimum `size` in bytes
Returns an MmapInfo tuple with the file descriptor, actual size, and a
pointer to the begging of the mmap.
Note that the size returned is rounded up to the nearest PAGESIZE.
"""
# Create new empty file to back memory map on disk
fd = os.open(filename, os.O_CREAT | os.O_TRUNC | os.O_RDWR)
if size > PAGESIZE:
if size % PAGESIZE:
size = size + (PAGESIZE - (size % PAGESIZE))
else:
size = PAGESIZE
# Zero out the file
os.ftruncate(fd, size)
m_ptr = mmap(size, fd)
return MmapInfo(fd, size, m_ptr)
# Linux consts from /usr/include/bits/mman.h
MS_ASYNC = 1
MS_SYNC = 4
libc.mmap.restype = ctypes.c_void_p
libc.mmap.argtypes = [
ctypes.c_void_p, # address
ctypes.c_size_t, # size of mapping
ctypes.c_int, # protection
ctypes.c_int, # flags
ctypes.c_int, # fd
ctypes.c_int, # offset (needs to be off_t type?)
]
def mmap(size, fd):
m_ptr = libc.mmap(None,
size,
stdlib_mmap.PROT_READ | stdlib_mmap.PROT_WRITE,
stdlib_mmap.MAP_SHARED,
fd,
0
)
if m_ptr == -1:
# Error
e = ctypes.get_errno()
raise OSError(e, errno.errorcode[e])
return m_ptr
libc.msync.restype = ctypes.c_int
libc.msync.argtypes = [
ctypes.c_void_p, # address
ctypes.c_size_t, # size of mapping
ctypes.c_int, # flags
]
def msync(mm_ptr, size, async=False):
if async:
flags = MS_ASYNC
else:
flags = MS_SYNC
status = libc.msync(mm_ptr, size, flags)
if status == -1:
e = ctypes.get_errno()
raise OSError(e, errno.errorcode[e])
libc.munmap.restype = ctypes.c_int
libc.munmap.argtypes = [
ctypes.c_void_p, # address
ctypes.c_size_t, # size of mapping
]
def munmap(mm_ptr, size):
status = libc.munmap(mm_ptr, size)
if status == -1:
e = ctypes.get_errno()
raise OSError(e, errno.errorcode[e])
"""mmstats reader implementation"""
from collections import namedtuple
import mmap
import struct
VERSION_1 = '\x01'
UNBUFFERED_FIELD = 255
def reader(fmt):
struct_fmt = struct.Struct(fmt)
size = struct.calcsize(fmt)
unpacker = struct_fmt.unpack
def wrapper(v):
return unpacker(v.read(size))[0]
wrapper.__name__ = 'unpack_' + fmt
return wrapper
read_ushort = reader('H')
read_ubyte = reader('B')
Stat = namedtuple('Stat', ('label', 'value'))
class MmStatsReader(object):
def __init__(self, data):
"""`data` should be a file-like object (mmap or file)"""
self.data = data
rawver = self.data.read(1)
if rawver == VERSION_1:
self.version = 1
else:
raise InvalidMmStatsVersion(repr(rawver))
@classmethod
def from_file(cls, fn):
return cls(open(fn, 'rb'))
@classmethod
def from_mmap(cls, fn):
f = open(fn, 'rb')
try:
mmapf = mmap.mmap(f.fileno(), 0, prot=mmap.ACCESS_READ)
except:
f.close()
raise
return cls(mmapf)
def __iter__(self):
d = self.data
while 1:
raw_label_sz = d.read(2)
if (not raw_label_sz or raw_label_sz == '\x00' or
raw_label_sz == '\x00\x00'):
# EOF
break
label_sz = struct.unpack('H', raw_label_sz)[0]
label = d.read(label_sz).decode('utf8', 'ignore')
type_sz = read_ushort(d)
type_ = d.read(type_sz)
sz = struct.calcsize(type_)
buf_idx = read_ubyte(d)
if buf_idx == UNBUFFERED_FIELD:
value = struct.unpack(type_, d.read(sz))[0]
else:
# Flip bit as the stored buffer is the *write* buffer
buf_idx ^= 1
buffers = d.read(sz * 2)
offset = sz * buf_idx
read_buffer = buffers[offset:(offset + sz)]
value = struct.unpack(type_, read_buffer)[0]
if isinstance(value, str):
# Special case strings as they're \x00 padded
value = value.split('\x00', 1)[0].decode('utf8', 'ignore')
yield Stat(label, value)
try:
d.close()
except Exception:
# Don't worry about exceptions closing the file
pass
import ctypes
import glob
import os
import sys
import time
import threading
from . import fields, libgettid, _mmap
from .defaults import DEFAULT_PATH, DEFAULT_FILENAME
removal_lock = threading.Lock()
def _expand_filename(path=DEFAULT_PATH, filename=DEFAULT_FILENAME):
"""Compute mmap's full path given a `path` and `filename`.
:param path: path to store mmaped files
:param filename: filename template for mmaped files
:returns: fully expanded path and filename
:rtype: str
Substitutions documented in :class:`~mmstats.models.BaseMmStats`
"""
substitutions = {
'CMD': os.path.basename(sys.argv[0]),
'PID': os.getpid(),
'TID': libgettid.gettid(),
}
# Format filename and path with substitution variables
filename = filename.format(**substitutions)
path = path.format(**substitutions)
return os.path.join(path, filename)
[docs]class FieldState(object):
"""Holds field state for each Field instance"""
def __init__(self, field):
self.field = field
[docs]class BaseMmStats(threading.local):
"""Stats models should inherit from this
Optionally given a filename or label_prefix, create an MmStats instance
Both `filename` and `path` support the following variable substiutions:
* `{CMD}` - name of application (`os.path.basename(sys.argv[0])`)
* `{PID}` - process's PID (`os.getpid()`)
* `{TID}` - thread ID (tries to get it via the `SYS_gettid` syscall but
fallsback to the Python/pthread ID or 0 for truly broken platforms)
This class is *not threadsafe*, so you should include both {PID} and
{TID} in your filename to ensure the mmaped files don't collide.
"""
def __init__(self, path=DEFAULT_PATH, filename=DEFAULT_FILENAME,
label_prefix=None):
self._removed = False
# Setup label prefix
self._label_prefix = '' if label_prefix is None else label_prefix
self._full_path = _expand_filename(path, filename)
self._filename = filename
self._path = path
self._offset = 1
# Store state for this instance's fields
self._fields = {}
total_size = self._offset
#FIXME This is the *wrong* way to initialize stat fields
for cls in self.__class__.__mro__:
for attrname, attrval in cls.__dict__.items():
if (attrname not in self._fields
and isinstance(attrval, fields.Field)):
total_size += self._add_field(attrname, attrval)
self._fd, self._size, self._mm_ptr = _mmap.init_mmap(
self._full_path, size=total_size)
mmap_t = ctypes.c_char * self._size
self._mmap = mmap_t.from_address(self._mm_ptr)
ver = ctypes.c_byte.from_address(self._mm_ptr)
ver.value = 1 # Version number
# Finally initialize thes stats
self._init_fields(total_size)
def _add_field(self, name, field):
"""Given a name and Field instance, add this field and retun size"""
# Stats need a place to store their per Mmstats instance state
state = self._fields[name] = FieldState(field)
# Call field._new to determine size
return field._new(state, self.label_prefix, name)
def _init_fields(self, total_size):
"""Once all fields have been added, initialize them in mmap"""
for state in self._fields.values():
# 2nd Call field._init to initialize new stat
self._offset = state.field._init(state, self._mm_ptr, self._offset)
@property
def filename(self):
return self._full_path
@property
def label_prefix(self):
return self._label_prefix
@property
def size(self):
return self._size
[docs] def flush(self, async=False):
"""Flush mmapped file to disk
:param async: ``True`` means the call won't wait for the flush to
finish syncing to disk. Defaults to ``False``
:type async: bool
"""
_mmap.msync(self._mm_ptr, self._size, async)
def remove(self):
with removal_lock:
# Perform regular removal of this process/thread's own file.
self._remove()
# Then ensure we clean up any forgotten thread-related files, if
# applicable. Ensure {PID} exists, for safety's sake - better to not
# cleanup than to cleanup multiple PIDs' files.
if '{PID}' in self._filename and '{TID}' in self._filename:
self._remove_stale_thread_files()
def _remove_stale_thread_files(self):
# The originally given (to __init__) filename string, containing
# expansion hints, is preserved in _filename. If it contains {TID} we
# can replace that with a glob expression to catch all TIDS for our
# given PID.
globbed = self._filename.replace('{TID}', '*')
# Re-expand any non-{TID} expansion hints
expanded = _expand_filename(path=self._path, filename=globbed)
# And nuke as appropriate.
for leftover in glob.glob(expanded):
os.remove(leftover)
def _remove(self):
"""Close and remove mmap file - No further stats updates will work"""
if self._removed:
# Make calling more than once a noop
return
_mmap.munmap(self._mm_ptr, self._size)
self._size = None
self._mm_ptr = None
self._mmap = None
os.close(self._fd)
try:
os.remove(self.filename)
except OSError:
# Ignore failed file removals
pass
# Remove fields to prevent segfaults
self._fields = {}
self._removed = True
[docs]class MmStats(BaseMmStats):
"""Mmstats default model base class
Just subclass, add your own fields, and instantiate:
>>> from mmstats.models import MmStats
>>> from mmstats.fields import CounterField
>>> class MyStats(MmStats):
... errors = CounterField()
...
>>> stats = MyStats()
>>> stats.errors.incr()
>>> stats.errors.value
1L
"""
pid = fields.StaticUIntField(label="sys.pid", value=os.getpid)
tid = fields.StaticInt64Field(label="sys.tid", value=libgettid.gettid)
uid = fields.StaticUInt64Field(label="sys.uid", value=os.getuid)
gid = fields.StaticUInt64Field(label="sys.gid", value=os.getgid)
python_version = fields.StaticTextField(label="org.python.version",
value=lambda: sys.version.replace("\n", ""))
created = fields.StaticDoubleField(label="sys.created", value=time.time)
import array
import ctypes
import math
import time
import warnings
from . import defaults
# >=2.7 ignores DeprecationWarning by default, mimic that behavior here
warnings.filterwarnings('ignore', category=DeprecationWarning)
[docs]class DuplicateFieldName(Exception):
"""Cannot add 2 fields with the same name to MmStat instances"""
def _create_struct(label, type_, type_signature, buffers=None):
"""Helper to wrap dynamic Structure subclass creation"""
if isinstance(label, unicode):
label = label.encode('utf8')
fields = [
('label_sz', defaults.SIZE_TYPE),
('label', ctypes.c_char * len(label)),
('type_sig_sz', defaults.SIZE_TYPE),
('type_signature', ctypes.c_char * len(type_signature)),
('write_buffer', ctypes.c_ubyte),
]
if buffers is None:
fields.append(('value', type_))
else:
fields.append(('buffers', (type_ * buffers)))
return type("%sStruct" % label.title(),
(ctypes.Structure,),
{'_fields_': fields, '_pack_': 1}
)
class Field(object):
initial = 0
def __init__(self, label=None):
self._struct = None # initialized in _init
if label:
self.label = label
else:
self.label = None
def _new(self, state, label_prefix, attrname, buffers=None):
"""Creates new data structure for field in state instance"""
# Key is used to reference field state on the parent instance
self.key = attrname
# Label defaults to attribute name if no label specified
if self.label is None:
state.label = label_prefix + attrname
else:
state.label = label_prefix + self.label
state._StructCls = _create_struct(
state.label, self.buffer_type,
self.type_signature, buffers)
state.size = ctypes.sizeof(state._StructCls)
return state.size
def _init(self, state, mm_ptr, offset):
"""Initializes value of field's data structure"""
state._struct = state._StructCls.from_address(mm_ptr + offset)
state._struct.label_sz = len(state.label)
state._struct.label = state.label
state._struct.type_sig_sz = len(self.type_signature)
state._struct.type_signature = self.type_signature
state._struct.write_buffer = defaults.WRITE_BUFFER_UNUSED
state._struct.value = self.initial
return offset + ctypes.sizeof(state._StructCls)
@property
def type_signature(self):
return self.buffer_type._type_
def __repr__(self):
return '%s(label=%r)' % (self.__class__.__name__, self.label)
[docs]class NonDataDescriptorMixin(object):
"""Mixin to add single buffered __get__ method"""
def __get__(self, inst, owner):
if inst is None:
return self
return inst._fields[self.key]._struct.value
[docs]class DataDescriptorMixin(object):
"""Mixin to add single buffered __set__ method"""
def __set__(self, inst, value):
inst._fields[self.key]._struct.value = value
[docs]class BufferedDescriptorMixin(object):
"""\
Mixin to add double buffered descriptor methods
Always read/write as double buffering doesn't make sense for readonly
fields
"""
def __get__(self, inst, owner):
if inst is None:
return self
state = inst._fields[self.key]
# Get from the read buffer
return state._struct.buffers[state._struct.write_buffer ^ 1]
def __set__(self, inst, value):
state = inst._fields[self.key]
# Set the write buffer
state._struct.buffers[state._struct.write_buffer] = value
# Swap the write buffer
state._struct.write_buffer ^= 1
class ReadOnlyField(Field, NonDataDescriptorMixin):
def __init__(self, label=None, value=None):
super(ReadOnlyField, self).__init__(label=label)
self.value = value
def _init(self, state, mm, offset):
if self.value is None:
# Value can't be None
raise ValueError("value must be set")
elif callable(self.value):
# If value is a callable, resolve it now during initialization
self.value = self.value()
# Call super to do standard initialization
new_offset = super(ReadOnlyField, self)._init(state, mm, offset)
# Set the static field now
state._struct.value = self.value
# And return the offset as usual
return new_offset
[docs]class ReadWriteField(Field, NonDataDescriptorMixin, DataDescriptorMixin):
"""Base class for simple writable fields"""
[docs]class DoubleBufferedField(Field):
"""Base class for double buffered writable fields"""
def _new(self, state, label_prefix, attrname):
return super(DoubleBufferedField, self)._new(
state, label_prefix, attrname, buffers=2)
def _init(self, state, mm_ptr, offset):
state._struct = state._StructCls.from_address(mm_ptr + offset)
state._struct.label_sz = len(state.label)
state._struct.label = state.label
state._struct.type_sig_sz = len(self.type_signature)
state._struct.type_signature = self.type_signature
state._struct.write_buffer = 0
state._struct.buffers = 0, 0
return offset + ctypes.sizeof(state._StructCls)
[docs]class ComplexDoubleBufferedField(DoubleBufferedField):
"""Base Class for fields with complex internal state like Counters
Set InternalClass in your subclass
"""
InternalClass = None
def _init(self, state, mm_ptr, offset):
offset = super(ComplexDoubleBufferedField, self)._init(
state, mm_ptr, offset)
self._init_internal(state)
return offset
def _init_internal(self, state):
if self.InternalClass is None:
raise NotImplementedError(
"Must set %s.InternalClass" % type(self).__name__)
state.internal = self.InternalClass(state)
def __get__(self, inst, owner):
if inst is None:
return self
return inst._fields[self.key].internal
class _InternalFieldInterface(object):
"""Base class used by internal field interfaces like counter"""
def __init__(self, state):
self._struct = state._struct
@property
def value(self):
return self._struct.buffers[self._struct.write_buffer ^ 1]
@value.setter
def value(self, v):
self._set(v)
def _set(self, v):
# Set the write buffer
self._struct.buffers[self._struct.write_buffer] = v
# Swap the write buffer
self._struct.write_buffer ^= 1
[docs]class CounterField(ComplexDoubleBufferedField):
"""Counter field supporting an inc() method and value attribute"""
buffer_type = ctypes.c_uint64
type_signature = 'Q'
class InternalClass(_InternalFieldInterface):
"""Internal counter class used by CounterFields"""
def inc(self, n=1):
warnings.warn(
"inc(n=...) is deprecated. Use incr(amount=...)",
DeprecationWarning
)
self.incr(n)
def incr(self, amount=1):
"""Increment Counter by `amount` (defaults to 1)"""
self._set(self.value + amount)
[docs]class AverageField(ComplexDoubleBufferedField):
"""Average field supporting an add() method and value attribute"""
buffer_type = ctypes.c_double
class InternalClass(_InternalFieldInterface):
"""Internal mean class used by AverageFields"""
def __init__(self, state):
_InternalFieldInterface.__init__(self, state)
# To recalculate the mean we need to store the overall count
self._count = 0
# Keep the overall total internally
self._total = 0.0
def add(self, value):
"""Add a new value to the average"""
self._count += 1
self._total += value
self._set(self._total / self._count)
class _MovingAverageInternal(_InternalFieldInterface):
def __init__(self, state):
_InternalFieldInterface.__init__(self, state)
self._max = state.field.size
self._window = array.array('d', [0.0] * self._max)
self._idx = 0
self._full = False
def add(self, value):
"""Add a new value to the moving average"""
self._window[self._idx] = value
if self._full:
self._set(math.fsum(self._window) / self._max)
else:
# Window isn't full, divide by current index
self._set(math.fsum(self._window) / (self._idx + 1))
if self._idx == (self._max - 1):
# Reset idx
self._idx = 0
self._full = True
else:
self._idx += 1
class MovingAverageField(ComplexDoubleBufferedField):
buffer_type = ctypes.c_double
InternalClass = _MovingAverageInternal
def __init__(self, size=100, **kwargs):
super(MovingAverageField, self).__init__(**kwargs)
self.size = size
class _TimerContext(object):
"""Class to wrap timer state"""
def __init__(self, timer=time.time):
self._timer = timer
self.start = timer()
self.end = None
def get_time(self):
return self._timer()
@property
def done(self):
"""True if timer context has stopped"""
return self.end is not None
@property
def elapsed(self):
"""Returns time elapsed in context"""
if self.done:
return self.end - self.start
else:
return self.get_time() - self.start
def stop(self):
self.end = self.get_time()
[docs]class TimerField(MovingAverageField):
"""Moving average field that provides a context manager for easy timings
As a context manager:
>>> class T(MmStats):
... timer = TimerField()
>>> t = T()
>>> with t.timer as ctx:
... assert ctx.elapsed > 0.0
>>> assert t.timer.value > 0.0
>>> assert t.timer.last > 0.0
"""
def __init__(self, timer=time.time, **kwargs):
super(TimerField, self).__init__(**kwargs)
self.timer = timer
class InternalClass(_MovingAverageInternal):
def __init__(self, state):
_MovingAverageInternal.__init__(self, state)
self._ctx = None
self.timer = state.field.timer
def start(self):
"""Start the timer"""
self._ctx = _TimerContext(self.timer)
def stop(self):
"""Stop the timer"""
self._ctx.stop()
self.add(self._ctx.elapsed)
def __enter__(self):
self.start()
return self._ctx
def __exit__(self, exc_type, exc_value, exc_tb):
self.stop()
@property
def last(self):
"""Get the last recorded value"""
if self._ctx is None:
return 0.0
else:
return self._ctx.elapsed
[docs]class BufferedDescriptorField(DoubleBufferedField, BufferedDescriptorMixin):
"""Base class for double buffered descriptor fields"""
[docs]class UInt64Field(BufferedDescriptorField):
"""Unbuffered read-only 64bit Unsigned Integer field"""
buffer_type = ctypes.c_uint64
type_signature = 'Q'
[docs]class UIntField(BufferedDescriptorField):
"""32bit Double Buffered Unsigned Integer field"""
buffer_type = ctypes.c_uint32
type_signature = 'I'
[docs]class IntField(BufferedDescriptorField):
"""32bit Double Buffered Signed Integer field"""
buffer_type = ctypes.c_int32
type_signature = 'i'
[docs]class ShortField(BufferedDescriptorField):
"""16bit Double Buffered Signed Integer field"""
buffer_type = ctypes.c_int16
[docs]class UShortField(BufferedDescriptorField):
"""16bit Double Buffered Unsigned Integer field"""
buffer_type = ctypes.c_uint16
[docs]class FloatField(BufferedDescriptorField):
"""32bit Float Field"""
buffer_type = ctypes.c_float
[docs]class StaticFloatField(ReadOnlyField):
"""Unbuffered read-only 32bit Float field"""
buffer_type = ctypes.c_float
[docs]class DoubleField(BufferedDescriptorField):
"""64bit Double Precision Float Field"""
buffer_type = ctypes.c_double
[docs]class StaticDoubleField(ReadOnlyField):
"""Unbuffered read-only 64bit Float field"""
buffer_type = ctypes.c_double
[docs]class BoolField(ReadWriteField):
"""Boolean Field"""
# Avoid potential ambiguity and marshal bools to 0/1 manually
buffer_type = ctypes.c_byte
type_signature = '?'
def __init__(self, initial=False, **kwargs):
self.initial = initial
super(BoolField, self).__init__(**kwargs)
def __get__(self, inst, owner):
if inst is None:
return self
return inst._fields[self.key]._struct.value == 1
def __set__(self, inst, value):
inst._fields[self.key]._struct.value = 1 if value else 0
[docs]class StringField(ReadWriteField):
"""UTF-8 String Field"""
initial = ''
def __init__(self, size=defaults.DEFAULT_STRING_SIZE, **kwargs):
self.size = size
self.buffer_type = ctypes.c_char * size
super(StringField, self).__init__(**kwargs)
@property
def type_signature(self):
return '%ds' % self.size
def __get__(self, inst, owner):
if inst is None:
return self
return inst._fields[self.key]._struct.value.decode('utf8')
def __set__(self, inst, value):
if isinstance(value, unicode):
value = value.encode('utf8')
if len(value) > self.size:
# Round trip utf8 trimmed strings to make sure it's stores
# valid utf8 bytes
value = value[:self.size]
value = value.decode('utf8', 'ignore').encode('utf8')
elif len(value) > self.size:
value = value[:self.size]
inst._fields[self.key]._struct.value = value
[docs]class StaticUIntField(ReadOnlyField):
"""Unbuffered read-only 32bit Unsigned Integer field"""
buffer_type = ctypes.c_uint32
type_signature = 'I'
[docs]class StaticInt64Field(ReadOnlyField):
"""Unbuffered read-only 64bit Signed Integer field"""
buffer_type = ctypes.c_int64
type_signature = 'q'
[docs]class StaticUInt64Field(ReadOnlyField):
"""Unbuffered read-only 64bit Unsigned Integer field"""
buffer_type = ctypes.c_uint64
type_signature = 'Q'
[docs]class StaticTextField(ReadOnlyField):
"""Unbuffered read-only UTF-8 encoded String field"""
initial = ''
buffer_type = ctypes.c_char * 256
type_signature = '256s'