2008-09-16

StringBuffer

Today a potentially useful code sample:
class StringBuffer
. The class is a buffer into which you can write strings using various printing methods, and from which you can read all the data in the order of arrival. This object can be useful in one-way communication between two threads (or two such objects in two-way comunication), it is thread-safe.

The class will use
StringIO
- a built-in object that has several printing methods (we will add two more) and which allows random access reading and writing. We will remember the reading and writing points in the buffer, and use them during read and write commands. The buffer will also be emptied once it gets too long, to prevent high (infinite) memory usage.

Here's the code, with comments inside this time.
require 'pp' # allows method pp (pretty print); test in irb
require 'thread'

require 'thread'

# Thread synchronizer. A thread calls +wait+ and is stopped
# until the +timeout+ passes or until the method +signal+
# of this object is called to release all waiting threads.
class Synchronizer

def initialize
@waiting=[]
@mutex=Mutex::new
end

attr_reader :mutex # in case somebody wants to use it

def wait(timeout=nil)
thr=Thread.current
begin
# be sure to add myself to the list of waiting threads
@mutex.synchronize{@waiting<<thr}
# sleep given time or forever (yes, it does it)
sleep(*[timeout].compact)
ensure
# be sure to remove myself
@mutex.synchronize{@waiting.delete(thr)}
end
end

def signal
# wake up all waiting threads
@mutex.synchronize{@waiting.each{|t| t.wakeup}}
end

# Check if the thread is currently waiting
def waiting_thread?(thr)
raise ArgumentError,"Argument must be Thread!"\
unless thr.is_a? Thread
@waiting.include?(thr)
end

end

# We add two methods to the +StringIO+.
class StringIO

# Inspect into the buffer (self).
def p(*args)
puts args.map{|a| a.inspect}
end

# Pretty print into buffer (self).
def pp(*args)
args.each{|a| PP.pp(a,self)}
nil
end

end

class StringBuffer

# List of writing methods.
WRITE_METHODS=[:write,:<<,:print,:puts,:putc,:printf,:p,:pp]

# List of reading methods.
READ_METHODS=[:read,:gets,:getc,:readchar,:readline]

# If this much data is in the buffer, empty it.
TRUNC_LENGTH=1000

# dynamically define all writing methods
WRITE_METHODS.each\
{ |wm|
class_eval(
<<-METHOD
def #{wm}(*args)
# safely (mutex)
@mutex.synchronize\
{
# move the +StringIO+ pointer to the end
@buff.pos=@buff.length
# call the same method on the internal buffer
ret=@buff.#{wm}(*args)
# signal the synchronizer in case
# some thread was waiting for data
@synchronizer.signal unless empty?
ret
}
end
METHOD
)
}

# dynamically define read methods
READ_METHODS.each\
{ |rm|
class_eval(
<<-METHOD
def #{rm}(*args)
@mutex.synchronize\
{
# move pointer to the saved position of last read
@buff.pos=@r
# perform the read
ret=@buff.#{rm}(*args)
# save the new pointer
@r=@buff.pos
# call +trunc+ if there is at least +TRUNC_LENGTH+
# bytes of unnecessary data
trunc if @r>=TRUNC_LENGTH
ret
}
end
METHOD
)
}

def initialize
@buff=StringIO::new
# read position
@r=0
@mutex=Mutex::new
# synchronizer for threads waiting for data
@synchronizer=Synchronizer::new
end

attr_reader :synchronizer

def length
@buff.length-@r
end

def eof?
length==0
end

alias empty? eof?

def wait_for_data(timeout=nil)
@synchronizer.wait(timeout) if empty?
self unless empty?
end

private

def trunc
@buff.string=@buff.string[@r..-1]
@r=0
end

end
A small test
irb(main):002:0> s=StringBuffer::new
=> #<StringBuffer:0x2bf4f44 @mutex=#<Mutex:0x2bf4ef4>, @r=0,
# @buff=#<StringIO:0x2bf4f1c>,
# @synchronizer=#<Synchronizer:0x2bf4ee0 @mutex=#<Mutex:0x2bf4e54>,
# @waiting=[]>>
irb(main):003:0> Thread::new{loop{sleep 1;s.print "X"}}
=> #<Thread:0x2bf0b60 sleep>
irb(main):004:0> loop{s.wait_for_data;puts s.read}
XXXXXXXXXX
X
X
X
X
X
X
X
The first line with lots of
X
'es is because this many of them had been accumulated in the buffer before I called the command in line
004
.

No comments: