Processing a compressed json stream
Recently I had a technical problem to solve for a dojo4 client. They receive a realtime stream of newline delimited json events compressed with zlib from a data provider over a raw tcp socket.
The datastream protocol is pretty simple:
- Open up a tcp connect to a designated tcp
host:port
. - Receive a stream of zlib compressed newline delimited json events forever.
There is no protocol authentication, the app I'm working on is assigned a specific host:port
by the data provider to connect to, and I have told the provider what IP address the application is coming from. That's it.
Given all of that, the objective is to write a client that:
- connects to the socket
- continually reads data from the socket
- decompresses that data to a stream of newline delimited JSON
- parses that JSON
- hands that parsed object off to the next stage of the pipeline
Connecting and Reading
This is the initial script to test out connecting and receiving data to make sure that works. It connects to the given tcp host and port, and reads 1 megabyte of data from the stream. The full scripts are available via the gist links, I'll just be showing the relevant parts line in this post.
# .. see the full gist for the entire script
#
# Create the socket and make it non-blocking since this application is doing other things
# too
logger.info "Connecting to datasource"
socket = ::Socket.tcp(host, port)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking
logger.info "Socket created #{socket}"
read_buffer = String.new # A resuable string for use by readpartial
bufsize = 64*1024 # read up to this amount of data from socket
stop_after = 1024*1024 # 1 megabyte of data
total_bytes = 0
read_count = 0
logger.info "Connecting to datasource"
logger.info "Reading..."
loop do
bytes = socket.readpartial(bufsize, read_buffer)
total_bytes += bytes.bytesize
read_count += 1
break if total_bytes > stop_after
end
logger.info "Stopped after #{total_bytes} read in #{read_count} reads"
And when this is run:
$ ./01_connecting-client.rb $HOST $PORT
2021-01-30T21:10:56Z 27940 INFO : Socket created
2021-01-30T21:10:56Z 27940 INFO : Reading...
2021-01-30T21:10:56Z 27940 INFO : Stopped after 1051250 bytes read in 641 reads
Excellent, the connection works and bytes are received.
Reading zlib compressed data
Now the next bit is to decompress the zlib stream. Some might think that this would just be passing it off to gunzip
. That would be incorrect. This is an infinite stream of bytes from a socket. And although the gzip file format is compressed with the DEFLATE compression algorithm implemented by zlib, the gzip file format has a header and a footer. Headers and footers are not possible in a continuous stream, which has no beginning nor end.
Luckily handling DEFLATE is built into the ruby standard library via zlib
. For each of those buffers read from the socket, decompress it. Updating the code around the loop results in the following:
# .. see the full gist for the entire script
#
compressed_buffer = String.new # A resuable string for use by readpartial for compressed bytes
inflater = ::Zlib::Inflate.new(::Zlib::MAX_WBITS + 32)
uncompressed_bytes = 0
logger.info "Reading..."
logger.info "Writing to #{output_to}"
output = output_to == "-" ? $stdout : File.open(output_to, "w+")
loop do
socket.readpartial(bufsize, compressed_buffer)
total_bytes += compressed_buffer.bytesize
read_count += 1
uncompressed_buffer = inflater.inflate(compressed_buffer)
uncompressed_bytes += uncompressed_buffer.bytesize
output.write(uncompressed_buffer)
break if total_bytes > stop_after
end
output.close
logger.info "Read #{read_count} times from data source"
logger.info "Received #{total_bytes} of compressed data"
logger.info "Resulting in #{uncompressed_bytes} of decompressed data"
One of the fields in the JSON is t
whose value is a timestamp. If I run the output.json
through jq
and extract out the t
, and those look good, then its an indicator the processing is working correctly.
$ ./decompressing_client.rb $HOST $PORT output.json
2021-01-30T21:45:45Z 28817 INFO : Socket created
2021-01-30T21:45:45Z 28817 INFO : Reading...
2021-01-30T21:45:45Z 28817 INFO : Writing to output.json
2021-01-30T21:45:45Z 28817 INFO : Read 295 times from data source
2021-01-30T21:45:45Z 28817 INFO : Received 1049591 of compressed data
2021-01-30T21:45:45Z 28817 INFO : Resulting in 3858614 of decompressed data
$ wc output.json
2805 51512 3858614 output.json
$ jq .t < output.json > /dev/null
parse error: Unfinished string at EOF at line 2806, column 1431
After running it and testing - it looks good - except for the last line, which is garbled JSON. This is to be expected as the bytes that are read from the data stream are compressed bytes and the code decompresses it as blocks. The data is not line oriented yet.
Converting blocks of text to newlines with a pipe
Normally when reading from an IO object in ruby, I would use IO#readline
or IO#gets
to parse the input into newlines. In this case, the decompressed bytes are not in an IO
object, they are a block of bytes in a String
, and there may or may-not be a newline in it depending on how much was read and decompressed from the socket.
Originally I thought about writing something similar to a Java BufferedReader to convert the uncompressed bytestream into newlines. And then realized, it already exists in ruby - IO.pipe
.
IO.pipe
creates a pair of pipe endpoints (connected to each other) and returns them as a two-element array ofIO
objects:[
read_io, write_io]
.
If the decompressed bytes are written to one end of the pipe, then lines may be read from the other end of the pipe. Since the read end of the pipe is an IO
object and has both gets
and readline
methods. In short something like this:
read_io, write_io = IO.pipe
write_io.write(bunch_of_bytes)
while line = read_io.gets do
# do something with line
end
Bringing it all together
This changes the architecture of the program and moves it into a concurrent programming style. There needs to exist:
- one thread to read the datastream from the socket, decompress it and send it down the IO pipe
- another thread to read from the pipe as newlines and parse the json
- a third to process the parsed json.
First - extract out the reading from the socket and decompressing to a class that will be put in its own thread.
# .. see the full gist for the entire script
#
# class to read data from an input IO, decompress the data, and write it to an
# output IO. it'll collect stats during the process
#
class Decompressor
attr_reader :input
attr_reader :output
attr_reader :top_after
attr_reader :buffer_size
attr_reader :compressed_bytes
attr_reader :uncompressed_bytes
attr_reader :read_count
def initialize(input:, output:, stop_after: Float::INFINITY)
@input = input
@output = output
@stop_after = stop_after
@buffer_size = 64*1024 # How much maximum data to read from the socket at a go
@compressed_bytes = 0
@uncompressed_bytes = 0
@read_count = 0
end
def call
compressed_buffer = String.new
inflater = ::Zlib::Inflate.new(::Zlib::MAX_WBITS + 32)
loop do
input.readpartial(@buffer_size, compressed_buffer)
@compressed_bytes += compressed_buffer.bytesize
@read_count += 1
uncompressed_buffer = inflater.inflate(compressed_buffer)
@uncompressed_bytes += uncompressed_buffer.bytesize
output.write(uncompressed_buffer)
break if @compressed_bytes > @stop_after
end
output.close
end
end
Next - Extract the reading of the decompressed data into lines and parsing into json into its own class. This will also be put into a thread.
# .. see the full gist for the entire script
#
# class to read newlines from an input and write the output parsed object something
# else that responds to `<<`
#
class Parser
attr_reader :item_count
attr_reader :input_bytes
def initialize(input:, output:)
@item_count = 0
@input_bytes = 0
@stop = false
@input = input
@output = output
end
def stop
@stop = true
end
def call
loop do
break if @stop
line = @input.readline
@input_bytes += line.bytesize
event = JSON.parse(line)
@output << event
@item_count += 1
end
end
end
And finally tie all if it together using IO.pipe
and a Queue
so the parser can shovel off the events to something else.
# .. see the full gist for the entire script
# Create the socket and make it non-blocking since this application is doing other things
# too
socket = ::Socket.tcp(host, port)
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # make it non-blocking
# Create a pipe to buffer the uncompressed data from the socket so that the text may be
# parsed into newlines.
#
read_io, write_io = IO.pipe
write_io.set_encoding("BINARY") # to handle multibyte-character splitting
events = Queue.new
decompressor = Decompressor.new(input: socket, output: write_io, stop_after: stop_after)
parser = Parser.new(input: read_io, output: events)
# spawn threads for each of the objects
decompressor_thread = Thread.new { decompressor.call }
parser_thread = Thread.new { parser.call }
# spawn a thread to consume all the events from the parser and throw them away
consumed_count = 0
consumer_thread = Thread.new {
loop do
e = events.deq
consumed_count += 1 unless e.nil?
break if events.closed? && events.empty?
end
}
You may be wondering about the line write_io.set_encoding("BINARY")
. This particular item took a while to figure out. The data that is coming out of the decompressor are raw bytes. Those bytes need to be interpreted as UTF-8 characters since JSON requires UTF-8. It is pretty much guaranteed that at some point a multibyte UTF-8 character is going to be split across decompression chunks.
By default the pipe ends up with a default encoding of UTF-8 on both input and output. When the decompressor writes uncompressed bytes to the pipe, if there is a partial multibyte UTF-8 character in that write operation, then ruby will raise an exception since the byte sequence is not a valid UTF-8 sequence.
Doing write_io.setn_encoding("BINARY")
the write side of the pipe now has a BINARY
encoding set, the pipe is now effectively a buffer that converts unencoded bytes to a line oriented UTF-8 characters.
Running this new script results in:
$ ./json_parsing_client.rb $HOST $PORT
2021-02-01T20:19:24Z 10004 INFO : Decompressor: read 245 times
2021-02-01T20:19:24Z 10004 INFO : Decompressor: received 1041114 bytes
2021-02-01T20:19:24Z 10004 INFO : Decompressor: forwarded on 3844323 bytes
2021-02-01T20:19:24Z 10004 INFO : Parser : received 3814991
2021-02-01T20:19:24Z 10004 INFO : Parser : forwarded on 2581 events
2021-02-01T20:19:24Z 10004 INFO : Consumer : threw away 2581 events
And now I have a proof-of-concept, compressed JSON stream client that I can flesh out to use in the application.