Today a potentially useful code sample:
The class will use
Here's the code, with comments inside this time.
A small test
The first line with lots of
class StringBuffer. The class is a buffer into which you can write strings using various printing methods, and from which you can read all the data in the order of arrival. This object can be useful in one-way communication between two threads (or two such objects in two-way comunication), it is thread-safe.
The class will use
StringIO
- a built-in object that has several printing methods (we will add two more) and which allows random access reading and writing. We will remember the reading and writing points in the buffer, and use them during read and write commands. The buffer will also be emptied once it gets too long, to prevent high (infinite) memory usage.Here's the code, with comments inside this time.
require 'pp' # allows method pp (pretty print); test in irb
require 'thread'
require 'thread'
# Thread synchronizer. A thread calls +wait+ and is stopped
# until the +timeout+ passes or until the method +signal+
# of this object is called to release all waiting threads.
class Synchronizer
def initialize
@waiting=[]
@mutex=Mutex::new
end
attr_reader :mutex # in case somebody wants to use it
def wait(timeout=nil)
thr=Thread.current
begin
# be sure to add myself to the list of waiting threads
@mutex.synchronize{@waiting<<thr}
# sleep given time or forever (yes, it does it)
sleep(*[timeout].compact)
ensure
# be sure to remove myself
@mutex.synchronize{@waiting.delete(thr)}
end
end
def signal
# wake up all waiting threads
@mutex.synchronize{@waiting.each{|t| t.wakeup}}
end
# Check if the thread is currently waiting
def waiting_thread?(thr)
raise ArgumentError,"Argument must be Thread!"\
unless thr.is_a? Thread
@waiting.include?(thr)
end
end
# We add two methods to the +StringIO+.
class StringIO
# Inspect into the buffer (self).
def p(*args)
puts args.map{|a| a.inspect}
end
# Pretty print into buffer (self).
def pp(*args)
args.each{|a| PP.pp(a,self)}
nil
end
end
class StringBuffer
# List of writing methods.
WRITE_METHODS=[:write,:<<,:print,:puts,:putc,:printf,:p,:pp]
# List of reading methods.
READ_METHODS=[:read,:gets,:getc,:readchar,:readline]
# If this much data is in the buffer, empty it.
TRUNC_LENGTH=1000
# dynamically define all writing methods
WRITE_METHODS.each\
{ |wm|
class_eval(
<<-METHOD
def #{wm}(*args)
# safely (mutex)
@mutex.synchronize\
{
# move the +StringIO+ pointer to the end
@buff.pos=@buff.length
# call the same method on the internal buffer
ret=@buff.#{wm}(*args)
# signal the synchronizer in case
# some thread was waiting for data
@synchronizer.signal unless empty?
ret
}
end
METHOD
)
}
# dynamically define read methods
READ_METHODS.each\
{ |rm|
class_eval(
<<-METHOD
def #{rm}(*args)
@mutex.synchronize\
{
# move pointer to the saved position of last read
@buff.pos=@r
# perform the read
ret=@buff.#{rm}(*args)
# save the new pointer
@r=@buff.pos
# call +trunc+ if there is at least +TRUNC_LENGTH+
# bytes of unnecessary data
trunc if @r>=TRUNC_LENGTH
ret
}
end
METHOD
)
}
def initialize
@buff=StringIO::new
# read position
@r=0
@mutex=Mutex::new
# synchronizer for threads waiting for data
@synchronizer=Synchronizer::new
end
attr_reader :synchronizer
def length
@buff.length-@r
end
def eof?
length==0
end
alias empty? eof?
def wait_for_data(timeout=nil)
@synchronizer.wait(timeout) if empty?
self unless empty?
end
private
def trunc
@buff.string=@buff.string[@r..-1]
@r=0
end
end
irb(main):002:0> s=StringBuffer::new
=> #<StringBuffer:0x2bf4f44 @mutex=#<Mutex:0x2bf4ef4>, @r=0,
# @buff=#<StringIO:0x2bf4f1c>,
# @synchronizer=#<Synchronizer:0x2bf4ee0 @mutex=#<Mutex:0x2bf4e54>,
# @waiting=[]>>
irb(main):003:0> Thread::new{loop{sleep 1;s.print "X"}}
=> #<Thread:0x2bf0b60 sleep>
irb(main):004:0> loop{s.wait_for_data;puts s.read}
XXXXXXXXXX
X
X
X
X
X
X
X
X
'es is because this many of them had been accumulated in the buffer before I called the command in line 004
.