2008-12-02

Stream of Consciousness

Wouldn't you like to write code like this:

reading "ebooks/345.txt" (
   lines
|> skip_until (matches "^\\*\\*\\* START OF")
|> skip
|> filter is_empty
|> words
|> fmap String.uppercase
|> take 20
|> to_list
)

and have it spit:

- : string list =
["DRACULA"; "BY"; "BRAM"; "STOKER"; "1897"; "EDITION"; "TABLE"; "OF";
 "CONTENTS"; "CHAPTER"; "1"; "JONATHAN"; "HARKER'S"; "JOURNAL"; "2";
 "JONATHAN"; "HARKER'S"; "JOURNAL"; "3"; "JONATHAN"]

(yes, it's the Project Gutemberg ebook for Bram Stoker's Dracula, text version; a suitably large corpus to run tests with). That would read: "Safely open the named file as input; parse it into lines, skip until finding the line that matches the regular expression; skip that line. Avoid empty lines, and split the remainder into words. Change the words to upper case, and take the first twenty and make a list of those." Well, this can be done with very nice code. But first, I'd like to do away with the somewhat "normal" bits.

Regular expressions in OCaml live in the Str module; they're low-level and inconvenient to use. A little wrapping-up makes them more palatable:

let matches patt =
  let re = Str.regexp patt in
  fun str -> Str.string_match re str 0

Note that I close over the compilation of the regular expression, so that repeated calls to matches with the same regexp don't waste effort. Of course, is_empty is simply:

let is_empty = matches "^[\t ]*$"

The mysterious "piping arrow" is nothing more than function composition in disguise:

let (|>) f g x = g (f x)

This is the reverse (or categorical) composition; its more popular sibling (one could say, its twin) is:

let (%) f g x = f (g x)

The magic sauce in the program is the use of OCaml's imperative streams. They are like lists, but with a lazy tail which is accessed by destructively taking the head. Like lazy lists, they allow for computation with infinite data, and they use just as much as they need. Unlike lazy lists, they don't support persistence, that is, the ability to keep reference to arbitrary versions of a value. For stream processing of text files they're a perfect match. The idea is to work with functions from streams to streams, or stream transformers. This is made very pleasing by the CamlP4 syntax extension for streams and stream parsers. To use it, you must compile your code with camlc -pp camlp4o, or #load "camlp4o.cma" (or #camlp4o if you're using findlib) in your source file, as explained in the manual.

By far the simplest transformer is the one that skips the head of a stream, and returns the (now headless) result:

let skip str = match str with parser
| [< '_ >] ->  str
| [< >]    -> [< >]

Here the single quote indicates that we're talking about a concrete stream element, as opposed to a substream. It's natural to extend this to a transformer that keeps dropping elements as long as they satisfy a test or predicate p:

let rec skip_while p str = match str with parser
| [< 'x when p x >] -> skip_while p str
| [< >]             -> str

This is the first pattern common to all transformers: return the passed in stream once we're done consuming, or chopping off, elements at its head. This is signalled by the use of the empty pattern [< >] which matches any stream. Of course the opposite filter is also handy and comes for free:

let skip_until p = skip_while (not % p)

We now know how skip_until (matches "^\\*\\*\\* START OF") |> skip works. A variation in the beheading game is to drop a number of elements from the stream:

let rec drop n str = match str with parser
| [< '_ when n > 0 >] -> drop (pred n) str
| [< >]               -> str

(note that this function is purposefully partial). The converse is to take a finite number of elements from a stream:

let rec take n str = match str with parser
| [< 'x when n > 0 >] -> [< 'x; take (pred n) str >]
| [< >]               -> [< >]

(note that this function is also purposefully partial). This is the second pattern common to all transformers: insert a substream on the right side of a matching by recursively invoking the transformer. Similar to this code is the filtering of a stream to weed out the elements satisfying a predicate:

let rec filter p str = match str with parser
| [< 'x when p x >] ->        filter p str
| [< 'x          >] -> [< 'x; filter p str >]
| [< >]             -> [< >]

The fact that in the first matching the recursive call to filter is outside a stream bracket pair means that it will eagerly call itself recursively until it finds an element not matching the predicate.

It's not too difficult to devise a fold over the elements of a stream:

let rec fold f e str = match str with parser
| [< 'x >] -> fold f (f e x) str
| [<    >] -> e

Of course, infinite streams cannot be folded over in finite time, but that's to be expected. This lets me write useful operations in a really concise way:

let length str = fold (fun n _ -> succ n) 0 str

let to_list str = List.rev (fold (fun l x -> x :: l) [] str)

With this, a pipeline like skip_until (matches "^\\*\\*\\* START OF") |> skip |> filter is_empty |> take 20 |> to_list just works, if only we had something to feed it. It could be handy to zip two streams together. Unfortunately, I can find no straightforward way to synchronize the advance of both streams, as there isn't a parallel match of two stream parsers. If you run camlp4o on a file containing the definitions so far, you'll see that the syntax gets rewritten (or expanded) to direct OCaml code that calls into the functions marked For system use only, not for the casual user in stream.mli. It is possible to emulate the result of a hypothetical stream parser doing the zipping by writing directly:

let rec zip str str' =
  match Stream.peek str, Stream.peek str' with
  | Some x, Some y ->
    Stream.junk str;
    Stream.junk str';
    Stream.icons (x, y) (Stream.slazy (fun () -> zip str str'))
  | _  -> Stream.sempty

With a simple stream like:

let nat () = Stream.from (fun i -> Some i)

(why is it a function? Also, note Conal Elliot's remark in his recent post about streams being functions over the natural numbers), a transformer for numbering a stream is as easy as (nat |> zip) (). Another way to build streams is to use the syntax:

let rec repeat e = [< 'e; repeat e >]

Stream expressions are not only used in parsers, as the right hand side of the matchings in the preceding functions hint at. This is possibly the simplest infinite stream you can have.

As it is well-known to Haskellers, streams have algebraic and categorical structure. For instance, they are functors; in other words, they can be mapped over their elements with a function that transforms them pointwise:

let rec fmap f str = match str with parser
| [< 'x >] -> [< 'f x; fmap f str >]
| [<    >] -> [< >]

In this case, the call to f is lazy, and not performed until the head of the resulting stream is requested. This can be good or not; for a strict evaluation of the head, a let binding must be used.

Streams also form a monad, entirely analogous to the list monad. The unit simply returns the one-element stream:

let return x = [< 'x >]

The bind concatenates the streams resulting from the application of a function to the each element in turn:

let rec bind str f = match str with parser
| [< 'x >] -> [< f x; bind str f >]
| [<    >] -> [< >]

Note that the difference between fmap and bind is a single quote! This is a direct consequence of the fact that every monad is a functor, via the identity fmap f x = bind x (return . f). Now words is straightforward:

let words =
  let re = Str.regexp "[\n\t ]+" in fun str ->
  bind str (Str.split re  |>  Stream.of_list)

It only remains one last piece of the puzzle: how does an input channel get transformed into a stream of lines? With a transformer (a parser, actually), of course. This is a rather simple one (especially compared to the imperative monstrosity it replaced; you don't want to know), once you dissect it into its components:

let lines =
  let buf = Buffer.create 10 in
  let get () = let s = Buffer.contents buf in Buffer.clear buf; s
  in
  let islf = parser
  | [< ''\n' >] -> ()
  | [< >]       -> ()
  in
  let rec read str = match str with parser
  | [< ''\n' >] ->           [< 'get (); read str >]
  | [< ''\r' >] -> islf str; [< 'get (); read str >]
  | [< 'c    >] -> Buffer.add_char buf c; read str
  | [<       >] -> [< >]
  in read

The characters in a line are accumulated in the buf buffer. Once a line is complete, get resets the buffer and returns its contents. The sub-parser islf tries to eat the LF in a CRLF pair. Since it includes an empty match, it cannot fail. The recursive parser read scans for a LF, or for a CR or CRLF terminator, accumulating characters until a complete line is included in the result stream. Invoking islf it in the right-hand side of the CR match in read is safe, in the sense that it cannot lead to back-tracking.

At last we get to the point where we can read from a file. For that Stream.of_channel directly and efficiently turns the contents of a file into a stream of characters:

let reading file k =
  let inch = open_in_bin file in
  try
    let res = k (Stream.of_channel inch) in
    close_in inch; res
  with e -> close_in inch; raise e

where k is the continuation that is safely wrapped in an exception handler that closes the file even in the event of an error. Safe input-output in the face of exceptions is an idiom borrowed from LISP, the well-known WITH-OPEN-FILE.

You could find various ways to exercise this small DSL; by way of example, a word count:

reading "ebooks/345.txt" (
   lines
|> skip_until (matches "^\\*\\*\\* START OF")
|> skip
|> filter is_empty
|> words
|> length
)
- : int = 163718

Numbering the first lines in the book:

reading "ebooks/345.txt" (
   lines
|> skip_until (matches "^\\*\\*\\* START OF")
|> skip
|> filter is_empty
|> fmap String.uppercase
|> zip (nat ())
|> take 10
|> to_list
)
- : (int * string) list =
[(0, "DRACULA"); (1, "BY"); (2, "BRAM STOKER"); (3, "1897 EDITION");
 (4, "TABLE OF CONTENTS"); (5, "CHAPTER");
 (6, "  1  JONATHAN HARKER'S JOURNAL");
 (7, "  2  JONATHAN HARKER'S JOURNAL");
 (8, "  3  JONATHAN HARKER'S JOURNAL");
 (9, "  4  JONATHAN HARKER'S JOURNAL")]

Which of course is not very accurate, as the first line should be numbered 1, not 0, and the line numbers don't correspond to the text but to the filtered stream. Let's try something else: define the arrow parts of the pair projection functions:

let first  f (x, y) = (f x, y)
and second g (x, y) = (x, g y)

Now, number the lines as read, and then apply the processing to the second component of the paired-up lines:

reading "ebooks/345.txt" (
   lines
|> zip (nat ())
|> skip_until (snd |> matches "^\\*\\*\\* START OF")
|> skip
|> filter (snd |> is_empty)
|> (fmap % second) String.uppercase
|> take 10
|> to_list
)
- : (int * string) list =
[(34, "DRACULA"); (36, "BY"); (38, "BRAM STOKER"); (41, "1897 EDITION");
 (46, "TABLE OF CONTENTS"); (49, "CHAPTER");
 (51, "  1  JONATHAN HARKER'S JOURNAL");
 (52, "  2  JONATHAN HARKER'S JOURNAL");
 (53, "  3  JONATHAN HARKER'S JOURNAL");
 (54, "  4  JONATHAN HARKER'S JOURNAL")]

I hope this helps in reducing Haskell envy a little bit.

2 comments:

Anonymous said...

Couldn't help it :)

#!/bin/ksh

function skip_until {
sed -n '/^\*\*\* START OF/,$p'
}

function skip_one {
sed -e '1,2d'
}

function drop_empty {
sed -e '/^\s*$/d'
}

function words {
while read line
do
for i in $line
do
echo $i
done
done
}

function uc {
tr '[:lower:]' '[:upper:]'
}

cat 345.txt | skip_until | skip_one | drop_empty | words | uc | head -20

# numbering
#cat 345.txt | skip_until | skip_one | drop_empty | uc | cat -n | head -10

Matías Giovannini said...

So you found me out! I'd not so much say that my inspiration was pipeline processing in Unix; it was rather that I tried to make a Unix pipeline work in OCaml.