Copyright 2020 Andy Curtis & Daniel Curtis

#include "ac_in.h"

The ac_in object is meant for reading input (generally files, but file descriptors, sets of records, and blocks of memory can also be used). The object can be configured as a single input or a set of inputs and then act as a single input. This allows functions to be written that can operate on a variety of inputs (one file, many files, files and buffers mixed, lists of files, etc). Sets of sorted files where each file is sorted can be joined together as a single unit and remain sorted. If the sorted files are distinct internally, but not within the set, the join can be configured to reduce the non-distinct elements.

Another useful feature supported by ac_in is built in compression. Files with an extension of .gz or .lz4 are automatically read from their respective formats without the need to first decompress the files.

In general, the ac_in object is meant to iterate over one or more files of records where the records are prefix formatted, delimiter formatted, or fixed length. Additional formats can be added as needed, but these three cover most use cases. The prefix format prefixes a 4 byte length at the beginning of each record. The delimiter format expects a delimiter such as a newline. The fixed length format expects all records to be a similar size.

Because disks are slow and RAM is fast, buffering is built into the ac_in object. A buffer size can be defined to reduce IO (particularly seeks).

The ac_in object plays an important role in sorting files. When large files are sorted, tmp files are written to disk which are themselves sorted. To finally sort the data, the tmp files are joined and further reduced if necessary as an ac_in object. The ac_out object allows one to call ac_out_in which can be used to avoid writing the final sort file if desired. Chaining sorts benefits from this in that the tmp files can be piped directly into the next sort.

Because the ac_in (and the ac_out) objects can be configured, it makes it possible to define pipelines of data with very little extra code. The ac_schedule object heavily uses the ac_in and ac_out objects to allow for very large data transformations in a defined environment.

All of the example code is found in examples/ac_in.


ac_in_t *ac_in_init(const char *filename, ac_in_options_t *options);

ac_in_init creates an input stream based upon a filename and options. The filename dictates whether the file is normal, gzip compressed (.gz extension), or lz4 compressed (.lz4 extension). If options are NULL, defaults will be used. NULL will be returned if the file cannot be opened.


ac_in_t *ac_in_init_with_fd(int fd, bool can_close, ac_in_options_t *options);

ac_in_init_with_fd creates an input stream based upon a file descriptor. If the input stream is compressed, that should be set through ac_in_options_gz or ac_in_options_lz4. Otherwise, if options are NULL, defaults will be used. can_close should normally be true meaning that when the ac_in object is destroyed, the file should be closed.


ac_in_t *ac_in_init_with_buffer(void *buf, size_t len, bool can_free,
                                ac_in_options_t *options);

ac_in_init_with_buffer creates a stream from a buffer. If the input stream is compressed, that should be set through ac_in_options_gz or ac_in_options_lz4. Otherwise, if options are NULL, defaults will be used. can_free should normally be true meaning that when the ac_in object is destroyed, the buffer should be freed.


ac_in_t *ac_in_records_init(ac_io_record_t *records, size_t num_records,
                            ac_in_options_t *options);

ac_in_records_init creates a stream from an array of ac_io_record_t structures.


ac_in_t *ac_in_ext_init(ac_io_compare_f compare, void *arg,
                        ac_in_options_t *options);

ac_in_ext_init creates an acint object which can be added to via ac_in_ext_add to setup a multiple stream input.


ac_in_t *ac_in_init_from_list(ac_io_file_info_t *files, size_t num_files,
                              ac_in_options_t *options);

ac_in_init_from_list create a stream which will open one file at a time for each of the files specified.


ac_in_t *ac_in_empty();

ac_in_empty creates an empty stream. The stream must still be destroyed once it is no longer needed.


void ac_in_limit(ac_in_t *h, size_t limit);

ac_in_limit limits the number of records returned from the stream (usually used for testing).


void ac_in_destroy_out(ac_in_t *in, ac_out_t *out,
                       void (*destroy_out)(ac_out_t *out));

ac_in_destroy_out destroys an output stream when the given input stream is destroyed.


void ac_in_ext_keep_first(ac_in_t *h);

ac_in_ext_keep_first keeps only the first equal record across multiple input streams.


void ac_in_ext_reducer(ac_in_t *h, ac_io_reducer_f reducer, void *arg);

ac_in_ext_reducer reduces equal records across multiple input streams.


void ac_in_ext_add(ac_in_t *h, ac_in_t *in, int tag);

ac_in_ext_add The tag can be options->tag from init of in if that makes sense. Otherwise, this can be useful to distinguish different input sources. The first param h must be initialized with ac_in_init_compare.


size_t ac_in_count(ac_in_t *h);

ac_in_count counts the records and closes the cursor.


ac_io_record_t *ac_in_advance(ac_in_t *h);

ac_in_advance advances to the next record and returns it.


ac_io_record_t *ac_in_current(ac_in_t *h);

ac_in_current returns the current record (this will be NULL if advance hasn't been called or ac_in_reset was called).


void ac_in_reset(ac_in_t *h);

ac_in_reset makes the next call to advance return the same record as the current. This is particularly helpful when advancing in a loop until a given record.


ac_io_record_t *ac_in_advance_unique(ac_in_t *h, size_t *num_r);

ac_in_advance_unique returns the next equal record across all of the streams. num_r will be the number of streams containing the next record. It is assumed that each stream will have exactly one equal record. If there is only one stream, num_r will always be 1 until the stream is finished.


ac_io_record_t *ac_in_advance_group(ac_in_t *h, size_t *num_r,
                                    bool *more_records, ac_io_compare_f compare,
                                    void *arg);

ac_in_advance_group takes as an input a comparison function and an arg, get all records which are equal and return them as an array. This will work with one or more input streams. num_r and more_records are returned. num_r will be one or more unless the end of the input is reached. If more_records is true, then the next call to ac_in_advance_group will have records which are equal to the current group.


void ac_in_destroy(ac_in_t *h);

ac_in_destroy destroys the input stream (or set of input streams)


void ac_in_options_init(ac_in_options_t *h);

ac_in_options_init initializes h to default values. ac_in_options_t is declared in impl/ac_in.h and not opaque. h is expected to point to a structure of this type (and not NULL).


void ac_in_options_buffer_size(ac_in_options_t *h, size_t buffer_size);

ac_in_options_buffer_size sets the buffer size for reading input from files. If the input is compressed, the buffer_size here is for the uncompressed content. Ideally, this would be large enough to support any record in the input. If an individual record is larger than the buffer_size, a temporary buffer will be created to hold the given record. The temporary buffer should happen as the exception (if at all).


void ac_in_options_format(ac_in_options_t *h, ac_io_format_t format);

ac_in_options_format sets the format of the records within the input. This should be called with one of the ac_io_format... methods.

Prefix format (4 byte length prefix before each record)
ac_in_options_format(&options, ac_io_prefix());

Delimiter format (specify a character at the end of a record)
ac_in_options_format(&options, ac_io_delimiter('\n'));

Fixed format (all records are the same length)
ac_in_options_format(&options, ac_io_fixed(<some_length>));

Other formats may be added in the future such as compressed, protobuf, etc.


void ac_in_options_abort_on_error(ac_in_options_t *h);

ac_in_options_abort_on_error generally applies to opening compressed files which have a corrupt format. If the format of the file is corrupt, abort() will be called instead of prematurely ending the file.


void ac_in_options_allow_partial_records(ac_in_options_t *h);

ac_in_options_allow_partial_records will cause an incomplete record to be counted as a valid record at the end of the file. By default records that are partial at the end of the file are dropped.


void ac_in_options_abort_on_partial(ac_in_options_t *h);

If a partial record exists at the end of a file, the record would normally be silently dropped (unless partial records are allowed above). Setting this would cause the program to abort on a partial record.


void ac_in_options_abort_on_file_not_found(ac_in_options_t *h);

If a file is not found, abort (instead of treating it as an empty file).


void ac_in_options_abort_on_file_empty(ac_in_options_t *h);

If a file is empty, abort()


void ac_in_options_tag(ac_in_options_t *h, int tag);

ac_in_options_tag sets a tag on a file. This tag can be useful to distinguish one file from another as all of the records from the given file with have the associated tag.


void ac_in_options_gz(ac_in_options_t *h, size_t buffer_size);

ac_in_options_gz indicates that the contents of the input are gzipped (if not using a filename). Normally input is determined to be gzipped if the filename ends in .gz and sets a compressed buffer size (which defaults to the regular buffer size).


void ac_in_options_lz4(ac_in_options_t *h, size_t buffer_size);

ac_in_options_lz4 indicates that the contents of the input are lz4 compressed (if not using a filename). Normally input is determined to be lz4 compressed if the filename ends in .lz4 and sets a compressed buffer size (which defaults to the regular buffer size).


void ac_in_options_compressed_buffer_size(ac_in_options_t *h,
                                          size_t buffer_size);

ac_in_options_compressed_buffer_size sets the compressed buffer size. The buffer_size is the size to buffer compressed content which will default to buffer_size.


void ac_in_options_reducer(ac_in_options_t *h, ac_io_compare_f compare,
                           void *compare_arg, ac_io_reducer_f reducer,
                           void *reducer_arg);

ac_in_options_reducer sets the compare and reducer on a given input. Within a single cursor, reduce equal items. In this case, it is assumed that the contents are sorted and that the compare method passed in will form groups of one or more records.