AC's Map Reduce Part 2

Copyright 2020 Andy Curtis & Daniel Curtis

Introducing ac_schedule

In the previous AC map/reduce post, the ac_in/ac_out objects were used to locate files based upon a directory and one or more extensions, read those the files a line at a time, lowercase each line, tokenize each line, sort the tokens alphabetically, reduce the frequencies, and finally sort by frequency descending followed by a secondary sort of tokens ascending.

Part two introduces ac_schedule. While the previous code is efficient, my macbook has 16 threaded cores and a lot of RAM. The code in part one is single threaded. ac_schedule let's you break up your work into smaller pieces, connect the work together, and it handles all of the execution. Part two builds upon part one (part one should not be skipped).

The ac_in/ac_out could easily be used to split up data manually and made to be multithreaded. The ac_schedule object does this for you and a lot more. It manages tasks, dependency chains, file handling, file naming, single task debugging, and so much more.

Setting up ac_schedule

The ac_schedule isn't particularly complicated, it just has a few parts to it. The first thing to do is setup tasks. For now, these tasks will do nothing at all (other than return true indicating that they didn't fail).

examples/mapreduce2/start.c

#include "ac_schedule.h"

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_task(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);

  ac_schedule_task(scheduler, "split", false, setup_task);
  ac_schedule_task(scheduler, "partition", true, setup_task);
  ac_schedule_task(scheduler, "first", true, setup_task);
  ac_schedule_task(scheduler, "all", true, setup_task);
  ac_schedule_task(scheduler, "multi", true, setup_task);

  ac_schedule_run(scheduler, ac_worker_complete);

  ac_schedule_destroy(scheduler);
  return 0;
}
$ make start
$ ./start
Finished multi[0] on thread 0 in 0.000ms
Finished first[0] on thread 2 in 0.000ms
Finished all[0] on thread 1 in 0.000ms
Finished partition[0] on thread 3 in 0.000ms
Finished split[0] on thread 0 in 0.000ms
Finished multi[1] on thread 2 in 0.000ms
Finished all[1] on thread 3 in 0.000ms
Finished first[1] on thread 0 in 0.000ms
Finished partition[1] on thread 1 in 0.000ms
ac_schedule_t *ac_schedule_init(int argc, char **args, size_t num_partitions,
                                size_t cpus, size_t ram);

ac_schedule_init is initialized with the command line arguments, number of partitions, number of cpus, and MB of ram. The command line arguments are passed to ac_schedule_init to potentially allow arguments to control how processing will be done.

#include "ac_schedule.h"

...

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);

In the example, the command line arguments are passed and the scheduler is setup to have 2 partitions, 4 cpus, and 10 MB of RAM.

ac_task_t *ac_schedule_task(ac_schedule_t *h, const char *task_name,
                            bool partitioned, ac_task_f setup);

Once the scheduler is initialized via ac_schedule_init, at least one task must be assigned to it. ac_schedule_task schedules a task by naming it, defining if it is partitioned or not, and specifying a setup function to finish setting the task up. In our example, we've assigned 5 tasks (the first one (named split) is not partitioned).

ac_schedule_task(scheduler, "split", false, setup_task);
ac_schedule_task(scheduler, "partition", true, setup_task);
ac_schedule_task(scheduler, "first", true, setup_task);
ac_schedule_task(scheduler, "all", true, setup_task);
ac_schedule_task(scheduler, "multi", true, setup_task);

Normally, each scheduled task would have a different setup function. In this case, all of the functions do the same thing (nothing except return true), so setup_task can be shared.

void ac_schedule_run(ac_schedule_t *h, ac_worker_f on_complete);

ac_schedule_run calls all of the setup methods specified via ac_schedule_task, sets up how all of the tasks will ultimately run, and finally runs them. If the on_complete call is not NULL, it will be called once a task completes. ac_worker_complete is provided by ac_schedule.h/c (it provides basic information).

  ac_schedule_run(scheduler, ac_worker_complete);

  ac_schedule_destroy(scheduler);
  return 0;
}

The scheduler is run and then destroyed.

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_task(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

The setup_task method is used to setup all 5 of the tasks. Typically, there would be one setup function for each task. This sets the method to run each partition of the given task. The do_nothing function returns true to indicate that the function succeeded.

This is a very basic shell of a program. Here is a quick recap...

In the main function

  1. initialize the scheduler using ac_schedule_init
  2. add tasks to the scheduler using ac_schedule_task (each task will have a name, be partitioned or not, and have a setup function to be called later)
  3. run the scheduler
  4. destroy the scheduler

Next,

  1. Implement the setup routines to define each task
  2. Implement the routines that are referenced in the setup routines

Now, let's consider the output from before.

$ make start
$ ./start
Finished multi[0] on thread 0 in 0.000ms
Finished first[0] on thread 2 in 0.000ms
Finished all[0] on thread 1 in 0.000ms
Finished partition[0] on thread 3 in 0.000ms
Finished split[0] on thread 0 in 0.000ms
Finished multi[1] on thread 2 in 0.000ms
Finished all[1] on thread 3 in 0.000ms
Finished first[1] on thread 0 in 0.000ms
Finished partition[1] on thread 1 in 0.000ms

All of the tasks finish in 0.000ms which is expected because the tasks do nothing except return true.

Notice that all of the tasks are run on threads. There is a thread for each cpu specified in ac_schedule_init. There are 5 tasks with [0] and 4 tasks with [1]. The split[0] doesn't have a corresponding split[1]. This is because split was defined as not being partitioned.

Run start again...

$ ./start
$

Nothing was output. This is because ac_schedule will assume that the tasks don't need to rerun as there isn't anything to indicate that the tasks inputs have changed. Let's rerun again with a -h option.

$ ./start -h
The scheduler is meant to aid in running tasks in parallel.
At the moment, it operates on a single host - but I'm planning
on improving it to support multiple computers.


-f|--force rerun selected tasks even if they don't need run

-t <task[:partitions]>[<task[:partitions]], select tasks and
   optionally partitions.  tasks are separated by vertical bars
   (|) and partitions are sub-selected by placing a colon and then
   the partitions.  The partitions can be a single partition
   number, arange separated by a - (1-3), or a list of single
   partitions or ranges separated by commas.  To select partitions
   1, 3, 4, and 5 of task named first_task
   -t first_task:1,3-5

-o  Normally, all tasks that are needed to run to complete
    selected task will run.  This will override that behavior
    and only run selected tasks

-d|--dump <filename1,[filename2],...> dump the contents of files

-p|--prefix <filename1,[filename2],...> dump the contents of files
    and prefix each line with the line number

-l|--list list details of execution (the plan)

-s|--show-files similar to list, except input and output files
     are also displayed.

-c|--cpus <num_cpus> overrides default number of cpus

-r|--ram <ram MB> overrides default ram usage

-h|--help show this help message

The first option is to force tasks to rerun even if they don't need to.

$ ./start -f
Finished multi[0] on thread 1 in 0.000ms
Finished first[0] on thread 3 in 0.000ms
Finished all[0] on thread 0 in 0.001ms
Finished partition[0] on thread 2 in 0.000ms
Finished split[0] on thread 1 in 0.000ms
Finished all[1] on thread 0 in 0.000ms
Finished first[1] on thread 2 in 0.000ms
Finished partition[1] on thread 1 in 0.000ms
Finished multi[1] on thread 3 in 0.001ms

The next option allows for task subselection. Maybe we only want to rerun partition 1 of first.

$ ./start -f -t first:1
Finished first[1] on thread 1 in 0.000ms

Or both partitions of first and the last multi partition.

$ ./start -f -t first:0-1 multi:1
Finished first[0] on thread 1 in 0.000ms
Finished first[1] on thread 2 in 0.000ms
Finished multi[1] on thread 0 in 0.000ms

If selecting all partitions of a task, this also works

$ ./start -f -t first
Finished first[0] on thread 1 in 0.000ms
Finished first[1] on thread 2 in 0.000ms

By default the scheduler will run all of the tasks if they haven't been run. For example, if you were to remove the tasks folder and then try and only run first, the following would happen.

$ rm -rf tasks/
$ ./start -t first
Finished multi[0] on thread 0 in 0.000ms
Finished partition[0] on thread 2 in 0.000ms
Finished all[0] on thread 1 in 0.000ms
Finished first[0] on thread 3 in 0.000ms
Finished split[0] on thread 0 in 0.000ms
Finished multi[1] on thread 2 in 0.000ms
Finished first[1] on thread 3 in 0.000ms
Finished partition[1] on thread 0 in 0.000ms
Finished all[1] on thread 1 in 0.000ms

If the -o option is used, it will only run tasks which are required to do the selected tasks and the selected tasks.

$ rm -rf tasks
$ ./start -t first -o
Finished first[0] on thread 0 in 0.000ms
Finished first[1] on thread 1 in 0.000ms

The -l option lists the tasks and information about the tasks. At this point, there is very little information about each task. The tasks have a name, partition information, and a custom runner. The -s option is similar to -l, except it will show more information (if available).

$ ./start -l
task: multi [0/2]
  custom runner
task: all [0/2]
  custom runner
task: first [0/2]
  custom runner
task: partition [0/2]
  custom runner
task: split [0/1]
  custom runner
task: multi [1/2]
  custom runner
task: all [1/2]
  custom runner
task: first [1/2]
  custom runner
task: partition [1/2]
  custom runner

The -d, -s, and -p options will make sense in a bit.

The -c option allows the number of cpus to be controlled.

$ ./start -f -c 1
Finished multi[0] on thread 0 in 0.000ms
Finished all[0] on thread 0 in 0.000ms
Finished first[0] on thread 0 in 0.000ms
Finished partition[0] on thread 0 in 0.000ms
Finished split[0] on thread 0 in 0.000ms
Finished multi[1] on thread 0 in 0.000ms
Finished all[1] on thread 0 in 0.000ms
Finished first[1] on thread 0 in 0.001ms
Finished partition[1] on thread 0 in 0.000ms

Notice that all threads run on once cpu.

The -r option is similar to the -c option, except that it controls how much ram can be used.

In the above example, it may have been desirable for a given task to run everytime and not have to use the -f option to run task over again. ac_task_run_evertime is meant to be called from the setup function to do just that.

bool setup_task(ac_task_t *task) {
  ac_task_run_everytime(task);
  ac_task_runner(task, do_nothing);
  return true;
}

Ordering the tasks

In the last section, the tasks ran in a somewhat random order. This would be okay if the tasks were in no way dependent upon each other, but that's rarely the case. Let's consider the code that assigns tasks to the scheduler.

ac_schedule_task(scheduler, "split", false, setup_task);
ac_schedule_task(scheduler, "partition", true, setup_task);
ac_schedule_task(scheduler, "first", true, setup_task);
ac_schedule_task(scheduler, "all", true, setup_task);
ac_schedule_task(scheduler, "multi", true, setup_task);

It may be desirable to have each task depend upon the previous task, except multi which will depend upon all of the tasks. In order to do this, a setup function will have to be created for each task.

ac_schedule_task(scheduler, "split", false, setup_split);
ac_schedule_task(scheduler, "partition", true, setup_partition);
ac_schedule_task(scheduler, "first", true, setup_first);
ac_schedule_task(scheduler, "all", true, setup_all);
ac_schedule_task(scheduler, "multi", true, setup_multi);

Next, each of those functions will have to be created.

bool setup_split(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

For this example, we can continue with the do_nothing which just returns true.

bool ac_task_dependency(ac_task_t *task, const char *dependency);
bool ac_task_partial_dependency(ac_task_t *task, const char *dependency);

ac_task_dependency creates a full dependency upon listed tasks (const char *dependency is a vertical bar separated list of tasks). ac_task_partial_dependency creates a dependency upon the previous task and the given partition (unless the previous task isn't partitioned, then it is the same as ac_task_dependency).

The following will wire up the dependencies.

bool setup_split(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_dependency(task, "split");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_dependency(task, "first");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_partial_dependency(task, "partition");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first|split");
  ac_task_runner(task, do_nothing);
  return true;
}

The code for this is found in examples/mapreduce2/order_tasks.c. Deleting the tasks directory will cleanup all previous run information (from the last section for example).

$ rm -rf tasks
$ make order_tasks
$ ./order_tasks
Finished split[0] on thread 2 in 0.000ms
Finished first[0] on thread 2 in 0.001ms
Finished first[1] on thread 1 in 0.000ms
Finished partition[0] on thread 1 in 0.000ms
Finished all[0] on thread 1 in 0.000ms
Finished partition[1] on thread 2 in 0.000ms
Finished all[1] on thread 2 in 0.000ms
Finished multi[0] on thread 2 in 0.000ms
Finished multi[1] on thread 3 in 0.000ms

Notice how all of the tasks are in order. all and partition are ordered on a per partition basis. all[0] runs after partition[0] and all[1] runs after partition[1].

In the last section the -l option showed very little information. Rerunning it now, will show more detail.

$ ./order_tasks -l
task: split [0/1]
  reverse dependencies:  multi[2] first[2]
  custom runner
task: first [0/2]
  dependencies:  split[1]
  reverse dependencies:  multi[2] partition[2]
  custom runner
task: first [1/2]
  dependencies:  split[1]
  reverse dependencies:  multi[2] partition[2]
  custom runner
task: partition [0/2]
  dependencies:  first[2]
  reverse dependencies:  multi[2]
  reverse partial dependencies:  all[2]
  custom runner
task: all [0/2]
  reverse dependencies:  multi[2]
  partial dependencies:  partition[2]
  custom runner
task: partition [1/2]
  dependencies:  first[2]
  reverse dependencies:  multi[2]
  reverse partial dependencies:  all[2]
  custom runner
task: all [1/2]
  reverse dependencies:  multi[2]
  partial dependencies:  partition[2]
  custom runner
task: multi [0/2]
  dependencies:  split[1] first[2] partition[2] all[2]
  custom runner
task: multi [1/2]
  dependencies:  split[1] first[2] partition[2] all[2]
  custom runner

This shows that each multi partition depends upon split, first, partition, and all. It shows that all has a reverse dependency of multi (multi depends upon all). all also has a partial dependency of partition. The scheduler makes sure that the dependencies are complete before a given task/partition can be run.

examples/mapreduce2/order_tasks.c

#include "ac_schedule.h"

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_split(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_dependency(task, "split");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_dependency(task, "first");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_partial_dependency(task, "partition");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first|split");
  ac_task_runner(task, do_nothing);
  return true;
}

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);

  ac_schedule_task(scheduler, "split", false, setup_split);
  ac_schedule_task(scheduler, "partition", true, setup_partition);
  ac_schedule_task(scheduler, "first", true, setup_first);
  ac_schedule_task(scheduler, "all", true, setup_all);
  ac_schedule_task(scheduler, "multi", true, setup_multi);

  ac_schedule_run(scheduler, ac_worker_complete);

  ac_schedule_destroy(scheduler);
  return 0;
}

Incorporating input files

Each task within the scheduler will typically have inputs and outputs and in general those inputs and outputs will be files. Within a larger overall set of tasks, there will be inputs which originate outside the scheduler (content that has been produced from other systems). Those should be the only inputs that need defined. All other inputs should be artifacts and/or outputs from other tasks.

In the completely contrived example from the last section, the task named "split" originates every other job. This section will focus on how to get the input into the scheduler.

The first change is to add a line to setup_split indicating that their is outside input using ac_task_input_files. ac_task_input_files expects the task, an arbitrary name for the input, a percentage (0.0-1.0) of RAM that can be used for the given input, and a function to call to get the list of input files.

void ac_task_input_files(ac_task_t *task, const char *name, double ram_pct,
                         ac_worker_file_info_f file_info);

The callback signature

ac_io_file_info_t *file_info(ac_worker_t *w, size_t *num_files,
                             ac_worker_input_t *inp);

Within the scheduler, there is a scheduler, tasks, and workers. A task is a job to do that may be partitioned. Workers are a particular partition of a task. The setup routine sets up how workers can accomplish their goal.

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_runner(task, do_nothing);
  return true;
}

The ac_task_input_files function is called within setup_split. This should be called prior to ac_task_runner. get_input_files is the second change. If split were multiple partitions (it is not in this case), each call to get_input_files should return the set of input files which relate to the given partition.

bool file_ok(const char *filename, void *arg) {
  char **extensions = (char **)arg;
  char **p = extensions;
  while (*p) {
    if (ac_io_extension(filename, *p))
      return true;
    p++;
  }
  return false;
}

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  *num_files = 0;
  char **extensions = ac_pool_split(w->worker_pool, NULL, ',', "tbontb");
  size_t num_inputs = 0;
  ac_io_file_info_t *inputs = ac_pool_io_list(w->worker_pool, "sample",
                                              &num_inputs, file_ok, extensions);
  *num_files = num_inputs;
  return inputs;
}

In Listing the Files from the first post about map/reduce there is an explanation for how to use the AC library to list files (file_ok and ac_io_list is explained). The worker (w) has two pools that can be used. Memory allocated from the worker_pool will remain valid for the entire duration of the worker (processing of a particular partition of a task). pool is the second member of ac_worker_t and is meant to be cleared frequently (after every record or group of records). This function is run before the runner (do_nothing at the moment), so it should use worker_pool. In Listing the Files, ac_split and ac_io_list were used. ac_pool_split and ac_pool_io_list are alternatives that use the pool. Because the pool is used, there is no need to later free the memory.

$ make input_data_1
$ ./input_data_1 -s -t split
task: split [0/1]
  reverse dependencies:  multi[2] first[2]
  custom runner
      input[0]: split_input (1)
        sample/sample.tbontb (20)

This task still doesn't do anything, but by using the -s option (and -t split to only show split task), you can see that the sample/sample.tbontb file is considered input. The code is not particularly useful since the input files are hard coded into get_input_files. That will be remedied in the next section.

$ rm -rf tasks
$ ./input_data_1
Finished split[0] on thread 1 in 0.000ms
Finished first[1] on thread 2 in 0.000ms
Finished first[0] on thread 0 in 0.000ms
Finished partition[0] on thread 0 in 0.000ms
Finished partition[1] on thread 3 in 0.000ms
Finished all[0] on thread 0 in 0.000ms
Finished all[1] on thread 3 in 0.000ms
Finished multi[0] on thread 3 in 0.000ms
Finished multi[1] on thread 0 in 0.000ms
$ ./input_data_1
$

Running input_data_1 the second time doesn't do anything because the scheduler doesn't see anything to do. However, if you touch sample/sample.tbontb, everything will be rerun because split is the source of every other task.

$ touch sample/sample.tbontb
$ ./input_data_1
Finished split[0] on thread 1 in 0.000ms
Finished first[1] on thread 2 in 0.000ms
Finished first[0] on thread 0 in 0.000ms
Finished partition[0] on thread 0 in 0.000ms
Finished partition[1] on thread 3 in 0.000ms
Finished all[0] on thread 0 in 0.000ms
Finished all[1] on thread 3 in 0.000ms
Finished multi[0] on thread 3 in 0.000ms
Finished multi[1] on thread 0 in 0.000ms
$

examples/mapreduce2/input_data_1.c

#include "ac_schedule.h"

bool file_ok(const char *filename, void *arg) {
  char **p = (char **)arg;
  while (*p) {
    if (ac_io_extension(filename, *p))
      return true;
    p++;
  }
  return false;
}

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  *num_files = 0;
  char **extensions = ac_pool_split(w->worker_pool, NULL, ',', "tbontb");
  size_t num_inputs = 0;
  ac_io_file_info_t *inputs = ac_pool_io_list(w->worker_pool, "sample",
                                              &num_inputs, file_ok, extensions);
  *num_files = num_inputs;
  return inputs;
}

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_dependency(task, "split");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_dependency(task, "first");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_partial_dependency(task, "partition");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first|split");
  ac_task_runner(task, do_nothing);
  return true;
}

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);

  ac_schedule_task(scheduler, "split", false, setup_split);
  ac_schedule_task(scheduler, "partition", true, setup_partition);
  ac_schedule_task(scheduler, "first", true, setup_first);
  ac_schedule_task(scheduler, "all", true, setup_all);
  ac_schedule_task(scheduler, "multi", true, setup_multi);

  ac_schedule_run(scheduler, ac_worker_complete);

  ac_schedule_destroy(scheduler);
  return 0;
}

Command Line Arguments

In the last example, the directory and the file extension were hard coded for the input files. In this section, I'll show how to use command line arguments and configuration. Adding arguments involves a fair amount of code, but it's pretty straight forward.

In the main function, ac_schedule_custom_args specifies a custom usage, a custom argument parser, a command that runs once all arguments have been processed.

typedef struct {
  size_t num_inputs;
  ac_io_file_info_t *inputs;
  char *dir;
  char *ext;
  char **extensions;
} custom_arg_t;

...

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);
  custom_arg_t custom;
  memset(&custom, 0, sizeof(custom));

  ac_schedule_custom_args(scheduler, custom_usage, parse_custom_args,
                          finish_custom_args, &custom);
  ...
  if (custom.inputs)
    ac_free(custom.inputs);
  if (custom.extensions)
    ac_free(custom.extensions);

The custom_usage function just uses printf.

void custom_usage() {
  printf("Select files within a given directory and do nothing!\n");
  printf("--dir <dir> - directory to scan\n");
  printf("--ext <extensions> - comma delimited list of file extensions to "
         "consider");
}

parse_custom_args returns 0 if there is a problem or the number of arguments to skip. In our example, we are using argument pairs and returning 2 if first arg is --dir or --ext.

int parse_custom_args(int argc, char **argv, void *arg) {
  if (argc < 2)
    return 0;

  custom_arg_t *ca = (custom_arg_t *)arg;

  if (!strcmp(argv[0], "--dir")) {
    ca->dir = argv[1];
    return 2;
  } else if (!strcmp(argv[0], "--ext")) {
    ca->ext = argv[1];
    return 2;
  }
  return 0;
}

finish_custom_args is called after all of the args are parsed.

bool finish_custom_args(int argc, char **argv, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  if (!ca->dir)
    ca->dir = (char *)"sample";
  if (!ca->ext)
    ca->ext = (char *)"tbontb";

  ca->extensions = ac_split2(NULL, ',', ca->ext);
  ca->inputs = ac_io_list(ca->dir, &ca->num_inputs, file_ok, ca->extensions);
  if (ca->inputs)
    return true;
  return false;
}

Finally, ac_task_custom_arg will retrieve the custom_arg_t structure in any function which passes an ac_worker_t.

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  *num_files = ca->num_inputs;
  return ca->inputs;
}

One issue with the command line args is that they need to be the same every time. To facilitate this, the scheduler will save the arguments and requires the parameter --new-args if they change. The custom_arg_t will remain in memory for the life of the scheduler and is expected to be able to be accessed in a thread-safe manner.

$ make input_data_2
$ rm -rf tasks
$ ./input_data_2 --dir .. --ext c,txt
Finished split[0] on thread 2 in 0.000ms
Finished first[0] on thread 2 in 0.000ms
Finished first[1] on thread 1 in 0.000ms
Finished partition[0] on thread 1 in 0.001ms
Finished partition[1] on thread 0 in 0.000ms
Finished all[0] on thread 1 in 0.000ms
Finished all[1] on thread 0 in 0.000ms
Finished multi[0] on thread 0 in 0.000ms
Finished multi[1] on thread 3 in 0.000ms
$ ./input_data_2 -s -t split
task: split [0/1]
  reverse dependencies:  multi[2] first[2]
  custom runner
      input[0]: split_input (74)
        ../ac_json/ac_json_dump_error_to_buffer.c (726)
        ../ac_json/ac_json_parse.c (1099)
        ../ac_json/ac_json_dump_error.c (627)
        ../ac_buffer/ac_buffer_sets.c (275)
        ../ac_buffer/ac_buffer_append.c (325)
        ../ac_buffer/ac_buffer_destroy.c (287)
        ../ac_buffer/ac_buffer_appends.c (296)
        ../ac_buffer/ac_buffer_clear.c (488)
$ ./input_data_2 --dir .. --ext md
[ERROR] Command line arguments don't match (c,txt != md) - (use --new-args?)

Find all tokens ending in .h, .c, and .md and sort by
frequency descending.

--dir <dir> - directory to scan
--ext <extensions> - comma delimited list of file extensions to consider
----------------------------------------------------------

The scheduler is meant to aid in running tasks in parallel.

In general, it would be undesirable for parameters to change in a job without first cleaning up the overall set of tasks. This error is shown to prevent data corruption and to warn the user that they might be making a mistake. The --new-args will override the error and continue.

examples/mapreduce2/input_data_2.c

#include "ac_schedule.h"

typedef struct {
  size_t num_inputs;
  ac_io_file_info_t *inputs;
  char *dir;
  char *ext;
  char **extensions;
} custom_arg_t;

bool file_ok(const char *filename, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  char **p = ca->extensions;
  while (*p) {
    if (ac_io_extension(filename, *p))
      return true;
    p++;
  }
  return false;
}

bool finish_custom_args(int argc, char **argv, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  if (!ca->dir)
    ca->dir = (char *)"sample";
  if (!ca->ext)
    ca->ext = (char *)"tbontb";

  ca->extensions = ac_split2(NULL, ',', ca->ext);
  ca->inputs = ac_io_list(ca->dir, &ca->num_inputs, file_ok, ca);
  if (ca->inputs)
    return true;
  return false;
}

int parse_custom_args(int argc, char **argv, void *arg) {
  if (argc < 2)
    return 0;

  custom_arg_t *ca = (custom_arg_t *)arg;

  if (!strcmp(argv[0], "--dir")) {
    ca->dir = argv[1];
    return 2;
  } else if (!strcmp(argv[0], "--ext")) {
    ca->ext = argv[1];
    return 2;
  }
  return 0;
}

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  *num_files = ca->num_inputs;
  return ca->inputs;
}

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_dependency(task, "split");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_dependency(task, "first");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_partial_dependency(task, "partition");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first|split");
  ac_task_runner(task, do_nothing);
  return true;
}

void custom_usage() {
  printf("Find all tokens ending in .h, .c, and .md and sort by\n");
  printf("frequency descending.\n\n");
  printf("--dir <dir> - directory to scan\n");
  printf("--ext <extensions> - comma delimited list of file extensions to "
         "consider");
}

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);
  custom_arg_t custom;
  memset(&custom, 0, sizeof(custom));

  ac_schedule_custom_args(scheduler, custom_usage, parse_custom_args,
                          finish_custom_args, &custom);

  ac_schedule_task(scheduler, "split", false, setup_split);
  ac_schedule_task(scheduler, "partition", true, setup_partition);
  ac_schedule_task(scheduler, "first", true, setup_first);
  ac_schedule_task(scheduler, "all", true, setup_all);
  ac_schedule_task(scheduler, "multi", true, setup_multi);

  ac_schedule_run(scheduler, ac_worker_complete);

  if (custom.inputs)
    ac_free(custom.inputs);
  if (custom.extensions)
    ac_free(custom.extensions);

  ac_schedule_destroy(scheduler);
  return 0;
}

Splitting up the input jobs

In the previous section, the input was setup to use command line arguments for the split task. The split task is configured to not be partitioned. In this section, the split task will be changed to a partitioned task. In the main file, it is a relatively small change.

ac_schedule_task(scheduler, "split", false, setup_split);

changes to

ac_schedule_task(scheduler, "split", true, setup_split);

This alone would be enough if it was okay for every partition to parse all of the input files. However, in our case, we will want to change the program to have each partition process some of the input files. This could be done using every Nth file or by using a hash. The advantage of using a hash over every Nth file is that if a file is added or removed to the file system, files may shift from one partition to another. This may cause some partitions to not rerun as they should. For our example, we will use a hash of the filename. The ac_io_file_info_t structure has a member hash which is the hash of the filename.

The get_input_files needs to change to include only the appropriate files.

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  *num_files = ca->num_inputs;
  return ca->inputs;
}

changes to

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  ac_io_file_info_t *p = ca->inputs;
  ac_io_file_info_t *ep = p + ca->num_inputs;
  size_t num_matching = 0;
  while(p < ep) {
    if((p->hash % w->num_partitions) == p->partition)
      num_matching++;
    p++;
  }
  *num_files = num_matching;
  if(!num_matching)
    return NULL;
  p = ca->inputs;
  ac_io_file_info_t *res = (ac_io_file_info_t *)ac_pool_alloc(w->worker_pool, sizeof(ac_io_file_info_t) * num_matching);
  ac_io_file_info_t *wp = res;
  while(p < ep) {
    if((p->hash % w->num_partitions) == p->partition) {
      *wp = *p;
      wp++;
    }
    p++;
  }
  return res;
}

This will count the number of inputs that belong to this partition, allocate an array using the pool that persists for the duration of the task, fill the array, and return it. There is a function to select inputs in ac_io named ac_io_select_file_info which does most of what the above function does.

ac_io_file_info_t *ac_io_select_file_info(ac_pool_t *pool, size_t *num_res,
                                          ac_io_file_info_t *inputs,
                                          size_t num_inputs, size_t partition,
                                          size_t num_partitions);

Using that function, the new get_input_files...

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  return ac_io_select_file_info(w->worker_pool, num_files,
                                ca->inputs, ca->num_inputs,
                                w->partition, w->num_partitions);
}
$ rm -rf tasks
$ ./input_data_3 --dir .. --ext c
Finished split[0] on thread 0 in 0.000ms
Finished split[1] on thread 2 in 0.000ms
Finished first[0] on thread 2 in 0.000ms
Finished first[1] on thread 2 in 0.000ms
Finished partition[0] on thread 2 in 0.000ms
Finished all[0] on thread 2 in 0.000ms
Finished partition[1] on thread 1 in 0.000ms
Finished all[1] on thread 1 in 0.000ms
Finished multi[0] on thread 1 in 0.000ms
Finished multi[1] on thread 3 in 0.000ms
$ ./input_data_3 -s -t split
task: split [0/2]
  reverse dependencies:  multi[2] first[2]
  custom runner
      input[0]: split_input (25)
        ../ac_buffer/ac_buffer_append.c (325)
        ../ac_buffer/ac_buffer_appends.c (296)
        ../ac_buffer/ac_buffer_clear.c (488)
        ../ac_buffer/ac_buffer_init.c (287)
        ...        
task: split [1/2]
  reverse dependencies:  multi[2] first[2]
  custom runner
      input[0]: split_input (47)
        ../ac_json/ac_json_dump_error_to_buffer.c (726)
        ../ac_json/ac_json_parse.c (1099)
        ../ac_json/ac_json_dump_error.c (627)
        ../ac_buffer/ac_buffer_sets.c (275)
        ...

examples/mapreduce2/input_data_3.c

#include "ac_schedule.h"

typedef struct {
  size_t num_inputs;
  ac_io_file_info_t *inputs;
  char *dir;
  char *ext;
  char **extensions;
} custom_arg_t;

bool file_ok(const char *filename, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  char **p = ca->extensions;
  while (*p) {
    if (ac_io_extension(filename, *p))
      return true;
    p++;
  }
  return false;
}

bool finish_custom_args(int argc, char **argv, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  if (!ca->dir)
    ca->dir = (char *)"sample";
  if (!ca->ext)
    ca->ext = (char *)"tbontb";

  ca->extensions = ac_split2(NULL, ',', ca->ext);
  ca->inputs = ac_io_list(ca->dir, &ca->num_inputs, file_ok, ca);
  if (ca->inputs)
    return true;
  return false;
}

int parse_custom_args(int argc, char **argv, void *arg) {
  if (argc < 2)
    return 0;

  custom_arg_t *ca = (custom_arg_t *)arg;

  if (!strcmp(argv[0], "--dir")) {
    ca->dir = argv[1];
    return 2;
  } else if (!strcmp(argv[0], "--ext")) {
    ca->ext = argv[1];
    return 2;
  }
  return 0;
}

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  return ac_io_select_file_info(w->worker_pool, num_files, ca->inputs,
                                ca->num_inputs, w->partition,
                                w->num_partitions);
}

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_dependency(task, "split");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_dependency(task, "first");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_partial_dependency(task, "partition");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first|split");
  ac_task_runner(task, do_nothing);
  return true;
}

void custom_usage() {
  printf("Find all tokens ending in .h, .c, and .md and sort by\n");
  printf("frequency descending.\n\n");
  printf("--dir <dir> - directory to scan\n");
  printf("--ext <extensions> - comma delimited list of file extensions to "
         "consider");
}

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);
  custom_arg_t custom;
  memset(&custom, 0, sizeof(custom));

  ac_schedule_custom_args(scheduler, custom_usage, parse_custom_args,
                          finish_custom_args, &custom);

  ac_schedule_task(scheduler, "split", true, setup_split);
  ac_schedule_task(scheduler, "partition", true, setup_partition);
  ac_schedule_task(scheduler, "first", true, setup_first);
  ac_schedule_task(scheduler, "all", true, setup_all);
  ac_schedule_task(scheduler, "multi", true, setup_multi);

  ac_schedule_run(scheduler, ac_worker_complete);

  if (custom.inputs)
    ac_free(custom.inputs);
  if (custom.extensions)
    ac_free(custom.extensions);

  ac_schedule_destroy(scheduler);
  return 0;
}

Ack files

In the last section, input files were wired up to the split task. We saw how touching the input files would cause split to run over again. Inside the tasks folder that is created by scheduler, there is an ack folder. After each partition is run, there will be a file inside the ack folder. If the ack file for a particular task / partition is touched, the task / partition will run again and all tasks that depend upon that task / partition.

$ ls tasks
ack		all_0		all_1		first_0		first_1		multi_0		multi_1		partition_0	partition_1	split_0
$ ls tasks/ack
all_0		all_1		first_0		first_1		multi_0		multi_1		partition_0	partition_1	split_0

Consider the partition:0 task/partition..

$ ./input_data_3 -l -t partition:0
task: partition [0/2]
  dependencies:  first[2]
  reverse dependencies:  multi[2]
  reverse partial dependencies:  all[2]
  custom runner

There is a full reverse dependency of multi and a reverse partitioned dependency of all. If tasks/ack/partition_0 is removed, it should cause partition[0], all[0], multi[0], and multi[1] to run.

$ rm tasks/ack/partition_0
$ ./input_data_3
Finished partition[0] on thread 1 in 0.000ms
Finished all[0] on thread 1 in 0.000ms
Finished multi[0] on thread 1 in 0.000ms
Finished multi[1] on thread 2 in 0.000ms

If instead of removing tasks/ack/partition_0, it is touched, then all[0], multi[0], and multi[1] will be run. This ack file is touched after a process finishes, so touching the ack file is equivalent to the task/partition finishing at the time the ack file is touched.

$ touch tasks/ack/partition_0
$ ./input_data_3
Finished all[0] on thread 0 in 0.000ms
Finished multi[0] on thread 0 in 0.000ms
Finished multi[1] on thread 1 in 0.000ms

Dumping Input

The input files for the split task partition 0 can be shown as follows..

$ ./input_data_3 -s -t split:0
task: split [0/2]
  reverse dependencies:  multi[2] first[2]
  custom runner
      input[0]: split_input (28)
        ../ac_buffer/ac_buffer_append.c (325)
        ../ac_buffer/ac_buffer_appends.c (296)
        ../ac_buffer/ac_buffer_clear.c (488)
        ../ac_buffer/ac_buffer_init.c (287)
        ../ac_buffer/ac_buffer_shrink_by.c (299)
        ../ac_buffer/ac_buffer_appendn.c (583)
        ../ac_buffer/ac_buffer_data.c (287)
        ../ac_buffer/ac_buffer_setn.c (266)

If you run ./input_data_3 -h, the help shows the following two options..

$ ./input_data_3 -h

   ...

-d|--dump <filename1,[filename2],...> dump the contents of files

-p|--prefix <filename1,[filename2],...> dump the contents of files
    and prefix each line with the line number

   ...

We can try to dump one of the input files...

$ ./input_data_3 -d ../ac_buffer/ac_buffer_setn.c
$

and nothing happens. This is because the scheduler doesn't know how to dump the input files. In ac_schedule.h/c there is a function which dumps text and it is defined as.

void ac_task_dump_text(ac_worker_t *w, ac_io_record_t *r, ac_buffer_t *bh,
                       void *arg) {
  ac_buffer_appends(bh, r->record);
}

ac_task_dump_text is a very simple function which simply appends the contents of the record to the buffer. If the format of input file is not plain text, then you will need to implement your own dump function which should dump text to the buffer.

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_input_dump(task, ac_task_dump_text, NULL);
  ac_task_runner(task, do_nothing);
  return true;
}

The ac_task_dump_text must be called after the associated ac_task_input_files call and not before another one or an ac_task_output call (described later).

$ make input_data_4
$ ./input_data_4 -d ../ac_buffer/ac_buffer_setn.c
$

This still isn't enough. The input record delimiter must be specified. The format choices are delimited (records end in a given character such as a newline or a zero), fixed (the records are determined by a fixed length), and prefix (the records are determined by a 4 byte length prefix before each record). The text files are newline delimited, so this must be specified (again after ac_task_input_files).

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_input_format(task, ac_io_delimiter('\n'));
  ac_task_input_dump(task, ac_task_dump_text, NULL);
  ac_task_runner(task, do_nothing);
  return true;
}

Trying again...

$ make input_data_4
$ ./input_data_4 -d  ../ac_buffer/ac_buffer_setn.c
#include "ac_buffer.h"
int main(int argc, char *argv[]) {
  ac_buffer_t *bh = ac_buffer_init(10);
  ac_buffer_setn(bh, 'H', 20);
  /* print HHHHHHHHHHHHHHHHHHHH followed with a newline */
  printf("%s\n", ac_buffer_data(bh));
  ac_buffer_destroy(bh);
  return 0;
}

And it works!

The full source code is found in examples/mapreduce2/input_data_4.c. I've ommitted the code as it only adds the following two lines to setup_split.

ac_task_input_format(task, ac_io_delimiter('\n'));
ac_task_input_dump(task, ac_task_dump_text, NULL);

Ordering tasks with a dataflow

Up until now, tasks were ordered by using dependencies. While this can be useful, a much more useful approach would be to order tasks based upon data dependencies. The scheduler has an approach to having a built in data pipeline that allows dependencies between tasks to be automatically determined. If data changes within the pipeline, downstream tasks which were dependent upon that data will automatically be run.

Each task within the scheduler will typically have inputs and outputs and in general those inputs and outputs will be files. Within a larger overall set of tasks, there will be inputs which originate outside the scheduler (content that has been produced from other systems). Those should be the only inputs that need defined. All other inputs should be artifacts and/or outputs from other tasks.

In the last post, ac_task_input_files was used to connect input from outside ac_schedule. To create a data pipeline, ac_task_output is used. ac_task_output connects tasks together by outputting files from one task to another. Those output files become input files for the destination task and that input creates a dependency on the task that output the files.

void ac_task_output(ac_task_t *task, const char *name, const char *destinations,
                    double out_ram_pct, double in_ram_pct, size_t flags);

In examples/mapreduce2/input_data_4.c, setup_split doesn't output anything. I'll change it to output a file named split.lz4

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_runner(task, do_nothing);
  return true;
}

changes to

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_output(task, "split.lz4", "first|multi", 0.9, 0.1, AC_OUTPUT_SPLIT);
  ac_task_runner(task, do_nothing);
  return true;
}

The ac_task_output line defines an output file named split.lz4 and two destinations for the file. The dependency upon split can now be removed from setup_first and setup_multi.

bool setup_first(ac_task_t *task) {
  ac_task_dependency(task, "split");
  ac_task_runner(task, do_nothing);
  return true;
}

...

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first|split");
  ac_task_runner(task, do_nothing);
  return true;
}

will change to

bool setup_first(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

...

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first");
  ac_task_runner(task, do_nothing);
  return true;
}
$ make data_pipeline_1
$ ./data_pipeline_1 -s --dir . --ext c
task: split [0/2]
  reverse dependencies:  first[2] multi[2]
  custom runner
      input[0]: split_input (3)
        ./input_data_3.c (3269)
        ./start.c (671)
        ./order_tasks.c (1221)
      output[0]: split.lz4 split
        destinations: multi[2] first[2]
        tasks/split_0/split_0.lz4
task: split [1/2]
  reverse dependencies:  first[2] multi[2]
  custom runner
      input[0]: split_input (5)
        ./input_data_4.c (3375)
        ./data_pipeline_2.c (3548)
        ./input_data_1.c (1915)
        ./data_pipeline_1.c (3411)
        ./input_data_2.c (3137)
      output[0]: split.lz4 split
        destinations: multi[2] first[2]
        tasks/split_1/split_1.lz4
task: first [0/2]
  dependencies:  split[2]
  reverse dependencies:  multi[2] partition[2]
  custom runner
      input[0]: split.lz4 (2)
        tasks/split_0/split_0_0.lz4 (0)
        tasks/split_1/split_1_0.lz4 (0)
task: first [1/2]
  dependencies:  split[2]
  reverse dependencies:  multi[2] partition[2]
  custom runner
      input[0]: split.lz4 (2)
        tasks/split_0/split_0_1.lz4 (0)
        tasks/split_1/split_1_1.lz4 (0)
task: partition [0/2]
  dependencies:  first[2]
  reverse dependencies:  multi[2]
  reverse partial dependencies:  all[2]
  custom runner
task: all [0/2]
  reverse dependencies:  multi[2]
  partial dependencies:  partition[2]
  custom runner
task: partition [1/2]
  dependencies:  first[2]
  reverse dependencies:  multi[2]
  reverse partial dependencies:  all[2]
  custom runner
task: all [1/2]
  reverse dependencies:  multi[2]
  partial dependencies:  partition[2]
  custom runner
task: multi [0/2]
  dependencies:  first[2] partition[2] all[2] split[2]
  custom runner
      input[0]: split.lz4 (2)
        tasks/split_0/split_0_0.lz4 (0)
        tasks/split_1/split_1_0.lz4 (0)
task: multi [1/2]
  dependencies:  first[2] partition[2] all[2] split[2]
  custom runner
      input[0]: split.lz4 (2)
        tasks/split_0/split_0_1.lz4 (0)
        tasks/split_1/split_1_1.lz4 (0)

Both multi and first still depend upon split even though they don't call ac_task_dependency. They both have an input and the input consists of two files. The files come from both split partitions and the filenames a different for partition 0 and partition 1 of multi and first. When AC_OUTPUT_SPLIT is defined in the ac_task_output function, the output from each partition is expected to be split into the number of partitions as the destination task (or tasks). If the destination has more than one task in its list, all of the destinations must have the same number of partitions.

        split[0]                          split[1]
        /       \                         /       \
split_0_0.lz4   split_0_1.lz4    split_1_0.lz4    split_1_1.lz4
      |              |                 |               |
   first[0]        first[1]           first[0]         first[1]
   multi[0]       multi[1]          multi[0]        multi[1]

split_0_0.lz4   split_1_0.lz4    split_0_1.lz4   split_1_1.lz4
        \       /                        \       /
        first[0]                           first[1]
        multi[0]                          multi[1]

It's important at this point to note that these files aren't actually created. They're expected to be created. Our ac_task_runner is still calling do_nothing which only returns true. The next section will wire up the remaining tasks.

examples/mapreduce2/data_pipeline_1.c

#include "ac_schedule.h"

typedef struct {
  size_t num_inputs;
  ac_io_file_info_t *inputs;
  char *dir;
  char *ext;
  char **extensions;
} custom_arg_t;

bool file_ok(const char *filename, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  char **p = ca->extensions;
  while (*p) {
    if (ac_io_extension(filename, *p))
      return true;
    p++;
  }
  return false;
}

bool finish_custom_args(int argc, char **argv, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  if (!ca->dir)
    ca->dir = (char *)"sample";
  if (!ca->ext)
    ca->ext = (char *)"tbontb";

  ca->extensions = ac_split2(NULL, ',', ca->ext);
  ca->inputs = ac_io_list(ca->dir, &ca->num_inputs, file_ok, ca);
  if (ca->inputs)
    return true;
  return false;
}

int parse_custom_args(int argc, char **argv, void *arg) {
  if (argc < 2)
    return 0;

  custom_arg_t *ca = (custom_arg_t *)arg;

  if (!strcmp(argv[0], "--dir")) {
    ca->dir = argv[1];
    return 2;
  } else if (!strcmp(argv[0], "--ext")) {
    ca->ext = argv[1];
    return 2;
  }
  return 0;
}

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  return ac_io_select_file_info(w->worker_pool, num_files, ca->inputs,
                                ca->num_inputs, w->partition,
                                w->num_partitions);
}

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_output(task, "split.lz4", "first|multi", 0.9, 0.1, AC_OUTPUT_SPLIT);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_dependency(task, "first");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_partial_dependency(task, "partition");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first");
  ac_task_runner(task, do_nothing);
  return true;
}

void custom_usage() {
  printf("Find all tokens ending in .h, .c, and .md and sort by\n");
  printf("frequency descending.\n\n");
  printf("--dir <dir> - directory to scan\n");
  printf("--ext <extensions> - comma delimited list of file extensions to "
         "consider");
}

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);
  custom_arg_t custom;
  memset(&custom, 0, sizeof(custom));

  ac_schedule_custom_args(scheduler, custom_usage, parse_custom_args,
                          finish_custom_args, &custom);

  ac_schedule_task(scheduler, "split", true, setup_split);
  ac_schedule_task(scheduler, "partition", true, setup_partition);
  ac_schedule_task(scheduler, "first", true, setup_first);
  ac_schedule_task(scheduler, "all", true, setup_all);
  ac_schedule_task(scheduler, "multi", true, setup_multi);

  ac_schedule_run(scheduler, ac_worker_complete);

  if (custom.inputs)
    ac_free(custom.inputs);
  if (custom.extensions)
    ac_free(custom.extensions);

  ac_schedule_destroy(scheduler);
  return 0;
}

Different ways to output

To get started, all of the ac_task_dependency and ac_task_partial_dependency calls will be replaced with ac_task_output calls.

bool setup_first(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_dependency(task, "first");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_partial_dependency(task, "partition");
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_dependency(task, "all|partition|first");
  ac_task_runner(task, do_nothing);
  return true;
}

changes to

bool setup_first(ac_task_t *task) {
  ac_task_output(task, "first.lz4", "partition|multi", 0.9, 0.1,
                 AC_OUTPUT_FIRST);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_output(task, "partition.lz4", "all|multi", 0.9, 0.1,
                 AC_OUTPUT_PARTITION);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_output(task, "all.lz4", "multi", 0.9, 0.1, AC_OUTPUT_NORMAL);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

In the last section, AC_OUTPUT_SPLIT was used. AC_OUTPUT_SPLIT is the only output option that expects the given task/partition to write split data (data divided amongst N outputs). AC_OUTPUT_FIRST, AC_OUTPUT_PARTITION, and AC_OUTPUT_NORMAL all will output the same file. The difference occurs in how downstream tasks consume them as inputs.

AC_OUTPUT_SPLIT will have multiple outputs based upon the number of partitions of the recipients.

        split[0]                          split[1]
        /       \                         /       \
split_0_0.lz4   split_0_1.lz4    split_1_0.lz4    split_1_1.lz4
      |              |                 |               |
   first[0]        first[1]           first[0]         first[1]
   multi[0]       multi[1]          multi[0]        multi[1]

split_0_0.lz4   split_1_0.lz4    split_0_1.lz4   split_1_1.lz4
        \       /                        \       /
        first[0]                           first[1]
        multi[0]                          multi[1]

AC_OUTPUT_FIRST will have one output and only the data from the first partition will be used downstream. The only partition that needs to finish before the downstreams can continue is the first partition (at the moment, all partitions execute).

        first[0]                          first[1]
           |                                 |
      first_0.lz4                         first_1.lz4
           |                                 |
       partition[0]                    (null - unused)
       partition[1]
        multi[0]
        multi[1]

AC_OUTPUT_PARTITION will have one output and the data will go to the corresponding partition downstream. Notice how all[0] and multi[0] can run immediately after partition[0] is complete (and before partition[1] is complete) and vice versa.

     partition[0]                 partition[1]
         |                             |
   partition_0.lz4              partition_1.lz4
         |                             |
       all[0]                        all[1]
      multi[0]                      multi[1]

AC_OUTPUT_NORMAL will have one output and all of the data from all of the partitions will go to all of the downstream partitions of the destination tasks.

       all[0]                 all[1]
         |                      |
      all_0.lz4              all_1.lz4
         |                      |
      multi[0]                multi[0]
      multi[1]                multi[1]

Running the -s option with the multi task will validate.

$ make data_pipeline_2
$ ./data_pipeline_2 -s -t multi
task: multi [0/2]
  dependencies:  all[2] first[2] split[2]
  partial dependencies:  partition[2]
  custom runner
      input[0]: split.lz4 (2)
        tasks/split_0/split_0_0.lz4 (0)
        tasks/split_1/split_1_0.lz4 (0)
      input[1]: partition.lz4 (1)
        tasks/partition_0/partition_0.lz4 (0)
      input[2]: first.lz4 (1)
        tasks/first_0/first_0.lz4 (0)
      input[3]: all.lz4 (2)
        tasks/all_0/all_0.lz4 (0)
        tasks/all_1/all_1.lz4 (0)
task: multi [1/2]
  dependencies:  all[2] first[2] split[2]
  partial dependencies:  partition[2]
  custom runner
      input[0]: split.lz4 (2)
        tasks/split_0/split_0_1.lz4 (0)
        tasks/split_1/split_1_1.lz4 (0)
      input[1]: partition.lz4 (1)
        tasks/partition_1/partition_1.lz4 (0)
      input[2]: first.lz4 (1)
        tasks/first_0/first_0.lz4 (0)
      input[3]: all.lz4 (2)
        tasks/all_0/all_0.lz4 (0)
        tasks/all_1/all_1.lz4 (0)

The input[0] and input[3] both have two inputs as expected and input[1] and input[2] have one input as expected.

examples/mapreduce2/data_pipeline_2.c

#include "ac_schedule.h"

typedef struct {
  size_t num_inputs;
  ac_io_file_info_t *inputs;
  char *dir;
  char *ext;
  char **extensions;
} custom_arg_t;

bool file_ok(const char *filename, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  char **p = ca->extensions;
  while (*p) {
    if (ac_io_extension(filename, *p))
      return true;
    p++;
  }
  return false;
}

bool finish_custom_args(int argc, char **argv, void *arg) {
  custom_arg_t *ca = (custom_arg_t *)arg;
  if (!ca->dir)
    ca->dir = (char *)"sample";
  if (!ca->ext)
    ca->ext = (char *)"tbontb";

  ca->extensions = ac_split2(NULL, ',', ca->ext);
  ca->inputs = ac_io_list(ca->dir, &ca->num_inputs, file_ok, ca);
  if (ca->inputs)
    return true;
  return false;
}

int parse_custom_args(int argc, char **argv, void *arg) {
  if (argc < 2)
    return 0;

  custom_arg_t *ca = (custom_arg_t *)arg;

  if (!strcmp(argv[0], "--dir")) {
    ca->dir = argv[1];
    return 2;
  } else if (!strcmp(argv[0], "--ext")) {
    ca->ext = argv[1];
    return 2;
  }
  return 0;
}

ac_io_file_info_t *get_input_files(ac_worker_t *w, size_t *num_files,
                                   ac_worker_input_t *inp) {
  custom_arg_t *ca = (custom_arg_t *)ac_task_custom_arg(w->task);
  return ac_io_select_file_info(w->worker_pool, num_files, ca->inputs,
                                ca->num_inputs, w->partition,
                                w->num_partitions);
}

bool do_nothing(ac_worker_t *w) { return true; }

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_input_format(task, ac_io_delimiter('\n'));
  ac_task_input_dump(task, ac_task_dump_text, NULL);
  ac_task_output(task, "split.lz4", "first|multi", 0.9, 0.1, AC_OUTPUT_SPLIT);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_first(ac_task_t *task) {
  ac_task_output(task, "first.lz4", "partition|multi", 0.9, 0.1,
                 AC_OUTPUT_FIRST);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_partition(ac_task_t *task) {
  ac_task_output(task, "partition.lz4", "all|multi", 0.9, 0.1,
                 AC_OUTPUT_PARTITION);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_all(ac_task_t *task) {
  ac_task_output(task, "all.lz4", "multi", 0.9, 0.1, AC_OUTPUT_NORMAL);
  ac_task_runner(task, do_nothing);
  return true;
}

bool setup_multi(ac_task_t *task) {
  ac_task_runner(task, do_nothing);
  return true;
}

void custom_usage() {
  printf("Find all tokens ending in .h, .c, and .md and sort by\n");
  printf("frequency descending.\n\n");
  printf("--dir <dir> - directory to scan\n");
  printf("--ext <extensions> - comma delimited list of file extensions to "
         "consider");
}

int main(int argc, char *argv[]) {
  ac_schedule_t *scheduler = ac_schedule_init(argc - 1, argv + 1, 2, 4, 10);
  custom_arg_t custom;
  memset(&custom, 0, sizeof(custom));

  ac_schedule_custom_args(scheduler, custom_usage, parse_custom_args,
                          finish_custom_args, &custom);

  ac_schedule_task(scheduler, "split", true, setup_split);
  ac_schedule_task(scheduler, "partition", true, setup_partition);
  ac_schedule_task(scheduler, "first", true, setup_first);
  ac_schedule_task(scheduler, "all", true, setup_all);
  ac_schedule_task(scheduler, "multi", true, setup_multi);

  ac_schedule_run(scheduler, ac_worker_complete);

  if (custom.inputs)
    ac_free(custom.inputs);
  if (custom.extensions)
    ac_free(custom.extensions);

  ac_schedule_destroy(scheduler);
  return 0;
}

Dumping Output

Dumping output is very similar to Dumping Input. Dumping input required specifying a format and a dump method. Recall the change to dump input...

bool setup_split(ac_task_t *task) {
  ac_task_input_files(task, "split_input", 0.1, get_input_files);
  ac_task_input_format(task, ac_io_delimiter('\n'));
  ac_task_input_dump(task, ac_task_dump_text, NULL);

Dumping output is similar, except input is replaced with output. The format can be aciodelimiter, aciofixed, or acioprefix. If the output is text and line delimited, it look like the following.

  ac_task_output(task, "split.lz4", "first|multi", 0.9, 0.1, AC_OUTPUT_SPLIT);
  ac_task_output_format(task, ac_io_delimiter('\n'));
  ac_task_output_dump(task, ac_task_dump_text, NULL);  

This will be explored more in the next sections. At the moment, no files are created. The next post will explore setting up the example in the first post using the ac_schedule library.