group-by processing with Rcpp and data.table

GForce optimization

data.table is a fantastic R package because of its speed, its compact syntax and lots of other things: fread, roll joins, key… One reason of its speed is the internal GForce optimization which avoids calling a function for each group.

By default, GForce (and other optimizations) are enabled. In this example, sum function is silently transformed into a gsum, an internal data.table GForce version which avoids calling sum for each group.

> system.time(dt[, sum(a), by=id, verbose=T])
Detected that j uses these columns: a
Finding groups using forderv ... 0.234s elapsed (1.988s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s
elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(a)'
GForce optimized j to 'gsum(a)'
Making each group and running j (GForce TRUE) ... gforce initial population of
grp took 0.021
gforce assign high and low took 0.037
This gsum took (narm=FALSE) ... gather took ... 0.024s
0.042s
gforce eval took 0.042
0.127s elapsed (1.200s cpu)
utilisateur     système      écoulé
      3.200       0.676       0.374

If optimizations are turned off (datatable.optimize = 0), sum is called 1800000 times because dt has 1800000 groups. In this example, GForce_ is 15-20x faster than R base sum function.

> options(datatable.optimize = 0)
> system.time(dt[, sum(a), by=id, verbose=T])
Detected that j uses these columns: a
Finding groups using forderv ... 0.233s elapsed (1.904s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.008s cpu)
All optimizations are turned off
Making each group and running j (GForce FALSE) ...
  memcpy contiguous groups took 1.070s for 1800000 groups
  eval(j) took 1.817s for 1800000 calls
4.869s elapsed (2.924s cpu)
utilisateur     système      écoulé
      4.840       2.472       5.114

This example shows that it’s important to avoid calling function for each by group.

na_fill function

I often work with date and state data. For a variable V1 and 2 ids, I have this kind of table:

id          date            V1
    id1     2001-01-01      A
    id1     2003-01-01      B
    id1     2008-01-01      A
    id1     2010-01-01      Z
    id2     2004-01-01      A
    id2     2011-01-01      Z

Sometimes there are missing values.

id          date            V1
    id1     2001-01-01      A
    id1     2003-01-01      B
    id1     2004-01-01      .
    id1     2005-01-01      .
    id1     2008-01-01      A
    id1     2010-01-01      Z
    id2     2002-01-01      .
    id2     2004-01-01      A
    id2     2009-01-01      .
    id2     2011-01-01      Z

In this case, missing values can be replaced by the last non missing value. It can be made with a nafill type function and the last observation carried forward (LOCF) option like that:

id          date            V1
    id1     2001-01-01      A
    id1     2003-01-01      B
    id1     2004-01-01      B
    id1     2005-01-01      B
    id1     2008-01-01      A
    id1     2010-01-01      Z
    id2     2002-01-01      .
    id2     2004-01-01      A
    id2     2009-01-01      A
    id2     2011-01-01      Z

3 or 4 years ago, I didn’t find a R package with such a function. There’s a na.fill function in packages zoo but it focuses on numeric variables and makes an interpolation.

This kind of function is not easy to write efficiently in R because the loop depends of the previous term,but it’s easy in C or C++. And there is a very impressive package which permits to write C++ functions in R : Rcpp.

Writing a na_fill function with Rcpp is not difficult. Here a version for LOCF:

#include <Rcpp.h>
using namespace Rcpp;

// [[Rcpp::export]]
StringVector Cna_fill_char(StringVector xx):
  StringVector x = clone(xx);
  R_xlen_t n = x.size();
  for(R_xlen_t i = 1; i<n; i++) {
    if(StringVector::is_na(x[i]) && !StringVector::is_na(x[i-1])) {
      x[i] = x[i-1];
    }
  }
  return x;
}

clone instruction is mandatory to avoid inplace processing on xx. But on big tables, this can be a good idea to avoid this step. So an inplace parameter is added with a line like:

StringVector x = inplace ? xx : clone(xx);

Another improvement that can be done is to implement NOCB (next observation carried backward). Final implementation adds a type parameter with 3 values :

  • 1 : LOCF

  • 2 : NOCB

  • 3 : LOCF then NOCB

Source:

// [[Rcpp::export]]
StringVector Cna_fill_char(StringVector xx, unsigned int type = 1, bool inplace = false) {
  StringVector x = inplace ? xx : clone(xx);
  R_xlen_t n = x.size();
  if (type & 0b01) {
    for(R_xlen_t i = 1; i<n; i++) {
      if(StringVector::is_na(x[i]) && !StringVector::is_na(x[i-1])) {
        x[i] = x[i-1];
      }
    }
  }
  if (type & 0b10) {
    for(R_xlen_t i = n - 1; i>=0; i--) {
      if(StringVector::is_na(x[i-1]) && !StringVector::is_na(x[i])) {
        x[i-1] = x[i];
      }
    }
  }
  return x;
}

C++ (and C) is a typed language. This version has to be duplicated for other types, NumericVector and IntegerVector. And dispatch is made with a R function like this:

na_fill <- function(x, type = 1, inplace = F) {
  if (is.object(x)) {
    stop("x is not a primitive type", call. = FALSE)
  }
  if (is.integer(x))
    Cna_fill_int(x, type, inplace)
  else if (is.double(x))
    Cna_fill_num(x, type, inplace)
  else if (is.character(x))
    Cna_fill_char(x, type, inplace)
  else
    stop("Unimplemented type : ", typeof(x))
}

Real data : ~ 2 millions ids and ~ 50 rows by id

Typical data tables I used are ~ 100M rows with ~ 2 millions ids and ~ 50 rows by id.

For example, let’s create a data.table like that:

ngrp <- 1800000
nbygrp <- 50

dt <- data.table(
  id=rep(1:ngrp, each=nbygrp),
  a=sample(1:100, nbygrp * ngrp, replace=T),
  b=sample(LETTERS, nbygrp * ngrp, replace=T),
  c=sample((1:100 + 0.5), nbygrp * ngrp, replace=T),
  d=sample(1:100, nbygrp * ngrp, replace=T),
  e=sample(1:100, nbygrp * ngrp, replace=T)
)
dt[(1:.N %% 2 == 0), a:=NA]
dt[(1:.N %% 3 == 0), b:=NA]
dt[(1:.N %% 4 == 0), c:=NA]
dt[(1:.N %% 5 == 0), d:=NA]
dt[(1:.N %% 6 == 0), e:=NA]

Now we call na_fill by id for columns a to e:

> vars = c("a", "b", "c", "d", "e")
> outvars <- paste0("z", vars)
> system.time(dt[, (outvars) := lapply(.SD, na_fill), by="id", .SDcols = vars, verbose=T])
Finding groups using forderv ... 0.249s elapsed (2.120s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.013s elapsed (0.016s cpu)
lapply optimization changed j from 'lapply(.SD, na_fill)' to 'list(na_fill(a), na_fill(b), na_fill(c), na_fill(d), na_fill(e))'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
  memcpy contiguous groups took 1.621s for 1800000 groups
  eval(j) took 55.646s for 1800000 calls
00:01:02 elapsed (59.5s cpu)
utilisateur     système      écoulé
     61.676       3.252      62.511

Not bad. But not very fast. With the same data and variable a, gsum takes 0.3 seconds. Why ? Because of message eval(j) took 55.646s for 1800000 calls. Once again, call is a costly operation in R. What would be great is to use GForce with Rcpp na_fill function. But I can’t find a way. In my understanding, GForce_ is very internal in data.table and GForce functions like gsum, gmean… are C only functions, invisibles from R.

Another solution is to provide group information to Rcpp na_fill function. But it’s not an easy task and It can be very slow. But data.table gives a solution. In the verbose output, note this line:

Finding groups using forderv ... 0.249s elapsed (2.120s cpu)

What is this forderv function which finds groups ? It’s not exported but is accessible with the :::. Let’s have a look:

> data.table:::forderv(dt, by="id", retGrp = T)
integer(0)
attr(,"starts")
   [1]     1    51   101   151   201   251   301   351   401   451   501   551   601

I will discuss integer(0) further. But attr starts is very interesting: it’s a integer vector of starting indices for each group. First id (1) starts in row 1, second id (2) starts in row 51 and so on. But row of what table ? Here, returning integer(0) means that table was already sorted and starts rows are rows of the original data.table.

But what happens if table is not sorted on by variable. Let’s have a look:

> data.table:::forderv(dt, by="a", retGrp = T)
   [1]    2    4    6    8   10   12   14   16   18   20   22
attr(,"starts")
  [1]        1 45000001 45449955 45898729 46349428 46798821

Now, forderv returns a table and starts gives row indices of this table. The first group begins at rows 1 and ends at row 45000001 - 1 = 45000000 and the first indices of data.table rows are 2, 4, 6…

This group is constitued of all the rows with NA value for variable a. Every even rows in data.table are NA and that’s why first indices of forderv return table are 2, 4, 6…

Of course, forderv is not easy to understand and that’s why it’s not exported. But, it was exactly what I need to improve in my na_fill function.

The idea is to pass forderv return value, which contains all valuable information on by groups, to the Rcpp function. In the source below, its the rows parameter ; the other parameters are the same as first na_fill function.

The tricky part is to deal between R indices which are 1-indexed and C++ indices which are 0-indexed and to correctly use rows and starts integer vectors to retrieve data.table rows.

// [[Rcpp::export]]
IntegerVector Cna_fill_int_grp(IntegerVector x, IntegerVector rows, unsigned int opt = 1, bool inplace = false) {
  R_xlen_t n = x.size();
  R_xlen_t nrows = rows.size();

  if (nrows != 0 && n != rows.size())
    stop("x and rows must have the same length");
  if (!rows.hasAttribute("starts"))
    stop("rows must have 'starts' attribute");

  IntegerVector grps = rows.attr("starts");
  R_xlen_t ngrps = grps.size();

  IntegerVector ret = inplace ? x : clone(x);

  for(int g=0; g<ngrps; g++) {
        // for each group, loop goes from f to <l
    R_xlen_t f = grps[g] - 1; // start indice of group g (C indice = R indice - 1)
    R_xlen_t l = g == (ngrps - 1) ? n : grps[g + 1] - 1; // last indice (n if last group)

    if (opt & 0b01) {
      for(R_xlen_t i = f + 1; i < l; i++) {
        R_xlen_t r  = nrows == 0 ? i : rows[i] - 1;
        R_xlen_t r1 = nrows == 0 ? i - 1 : rows[i - 1] - 1;
        if(IntegerVector::is_na(ret[r]) && !IntegerVector::is_na(ret[r1])) {
          ret[r] = ret[r1];
        }
      }
    }

    if (opt & 0b10) {
      for(R_xlen_t i = l - 1; i > f; i--) {
        R_xlen_t r  = nrows == 0 ? i : rows[i] - 1;
        R_xlen_t r1 = nrows == 0 ? i - 1 : rows[i - 1] - 1;
        if(IntegerVector::is_na(ret[r1]) && !IntegerVector::is_na(ret[r])) {
          ret[r1] = ret[r];
        }
      }
    }

  }
  return ret;
}

Timings

na_fill_grp with new variables

> vars = c("a", "b", "c", "d", "e")
> outvars <- paste0("z", vars)
> system.time(dt[, (outvars):= na_fill_grp(dt, var=vars, by="id")])
utilisateur     système      écoulé
      3.832       1.556       3.426

na_fill inplace

> dt1 <- copy(dt)
> system.time(dt1[, (vars) := lapply(.SD, function(x) na_fill(x, inplace=T)), by="id", .SDcols = vars, verbose=T])
Finding groups using forderv ... 0.233s elapsed (1.872s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.012s cpu)
lapply optimization changed j from 'lapply(.SD, function(x) na_fill(x, inplace = T))' to 'list(..FUN1(a), ..FUN1(b), ..FUN1(c), ..FUN1(d), ..FUN1(e))'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
  memcpy contiguous groups took 1.540s for 1800000 groups
  eval(j) took 61.073s for 1800000 calls
00:01:06 elapsed (00:01:04 cpu)
utilisateur     système      écoulé
     66.032       2.792      66.585

na_fill_grp inplace

> dt2 <- copy(dt)
> system.time(na_fill_grp(dt2, var=vars, by="id", inplace=T))
utilisateur     système      écoulé
      2.612       0.288       1.090
> identical(dt1, dt2)
[1] TRUE

Using Rcpp function with a data.table like by-group processing gives a significative improvment with na_fill (about 60x faster). It’s relatevely easy to adapt na_fill_grp because the hardest part is the same for all functions : iterating through groups and rows.

Comparaison with new data.table nafill

Current data.table branch master implements a new nafill function. Currently, characters are not supported which is essential for me (character support is on the roadmap). It implements a set function for inplace operations.

Let’s bench inplace versions with non character variables with the default number of threads (14 in my case) and with only one thread to avoid forming openmp team of threads:

> getDTthreads()
[1] 14
> dt0 <- copy(dt)
> system.time(dt0[, (vars2):=setnafill(.SD, type="locf", cols=vars2), .SDcols=vars2, by="id"])
utilisateur     système      écoulé
    905.860      27.588      67.498

> dt1 <- copy(dt)
> system.time(dt1[, (vars2) := lapply(.SD, function(x) na_fill(x, inplace=T)), by="id", .SDcols = vars2])
utilisateur     système      écoulé
     49.804       2.620      50.381

> dt2 <- copy(dt)
> system.time(na_fill_grp(dt2, var=vars2, by="id", inplace=T))
utilisateur     système      écoulé
      2.168       0.328       0.690

> identical(dt0, dt1)
[1] TRUE

> identical(dt1, dt2)
[1] TRUE


> setDTthreads(1)
> getDTthreads()
[1] 1
>

> vars2 = c("a", "c", "d", "e")
> dt0 <- copy(dt)
> system.time(dt0[, (vars2):=setnafill(.SD, type="locf", cols=vars2), .SDcols=vars2, by="id"])
utilisateur     système      écoulé
     35.124      23.620      58.733
> dt1 <- copy(dt)
> system.time(dt1[, (vars2) := lapply(.SD, function(x) na_fill(x, inplace=T)), by="id", .SDcols = vars2])
utilisateur     système      écoulé
     50.492       2.436      52.918
> dt2 <- copy(dt)
> system.time(na_fill_grp(dt2, var=vars2, by="id", inplace=T))
utilisateur     système      écoulé
      1.304       0.184       1.489
> identical(dt0, dt1)
[1] TRUE
> identical(dt1, dt2)
[1] TRUE

setnafill performance is close to Rcpp na_fill above version and is not optimized for by processing. Jan Gorecki, a data.table developper, suggests to set thread to 1L : he’s right because nafill function benefits of this (58s vs 67s). Rcpp versions doesn’t benefit of using only one thread.

Comparaison with GForce

For example, here is an simple equivalent of gsum written with compact loop:

#include <Rcpp.h>
using namespace Rcpp;

// [[Rcpp::export]]
NumericVector Csum_grp(NumericVector x, IntegerVector rows) {
  R_xlen_t n = x.size();
  R_xlen_t nrows = rows.size();

  if (nrows != 0 && n != rows.size())
    stop("x and rows must have the same length");
  if (!rows.hasAttribute("starts"))
    stop("rows must have 'starts' attribute");

  IntegerVector grps = rows.attr("starts");
  R_xlen_t ngrps = grps.size();

  NumericVector ret = NumericVector(n);

  for(int g=0; g<ngrps; g++) {
    double somme = 0;
    for(R_xlen_t ii = grps[g] - 1, i  = (nrows == 0 ? ii : rows[ii] - 1);
        ii < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
        ii++, i  = (nrows == 0 ? ii : rows[ii] - 1)) {
      somme += x[i];
    }
    for(R_xlen_t ii = grps[g] - 1, i  = (nrows == 0 ? ii : rows[ii] - 1);
        ii < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
        ii++, i  = (nrows == 0 ? ii : rows[ii] - 1)) {
      ret[i] = somme;
    }
  }
  return ret;
}

The above version use multiple C ternary operators to check if nrows == 0 (case where data.table is already sorted). Michael Chirico, another data,table developper, suggests to shift nrows==0 branching as early as possible so you get more smaller loops.

Here is a second version:

// [[Rcpp::export]]
NumericVector Csum_grp2(NumericVector x, IntegerVector rows) {
  R_xlen_t n = x.size();
  R_xlen_t nrows = rows.size();

  if (nrows != 0 && n != rows.size())
    stop("x and rows must have the same length");
  if (!rows.hasAttribute("starts"))
    stop("rows must have 'starts' attribute");

  IntegerVector grps = rows.attr("starts");
  R_xlen_t ngrps = grps.size();
  NumericVector ret = NumericVector(n);

  for(int g=0; g<ngrps; g++) {
    double somme = 0;
    if (nrows == 0) {
      for(R_xlen_t i = grps[g] - 1;
          i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
          i++) {
        somme += x[i];
      }
      for(R_xlen_t i = grps[g] - 1;
          i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
          i++) {
        ret[i] = somme;
      }
    } else {
      for(R_xlen_t i = grps[g] - 1;
          i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
          i++) {
        somme += x[rows[i] - 1];
      }
      for(R_xlen_t i = grps[g] - 1;
          i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
          i++) {
        ret[rows[i] - 1] = somme;
      }
    }
  }
  return ret;
}

Csum_grp and Csum_grp2 can be compared with gsum with default number of threads and one thread.

With sorted variable id:

> setDTthreads()
> print(getDTthreads())
[1] 14

> system.time(dt[, sa := Csum_grp(c, group(.SD, "id"))])
   user  system elapsed
  2.004   0.464   0.643

> system.time(dt[, sb := Csum_grp2(c, group(.SD, "id")), verbose=T])
Assigning to all 90000000 rows
RHS_list_of_columns == false
Direct plonk of unnamed RHS, no copy.
   user  system elapsed
  1.944   0.424   0.589

> system.time(dt[, sc := sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.210s elapsed (1.652s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.012s elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
  memcpy contiguous groups took 0.980s for 1800000 groups
  eval(j) took 1.373s for 1800000 calls
4.827s elapsed (3.084s cpu)
   user  system elapsed
  4.748   2.336   5.051

> system.time(dt[, sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.209s elapsed (1.684s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.012s elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
GForce optimized j to 'gsum(c)'
Making each group and running j (GForce TRUE) ... gforce initial population of grp took 0.020
gforce assign high and low took 0.060
This gsum took (narm=FALSE) ... gather took ... 0.037s
0.055s
gforce eval took 0.055
0.161s elapsed (1.476s cpu)
   user  system elapsed
  3.172   0.828   0.384

> setDTthreads(1)
> print(getDTthreads())
[1] 1

> system.time(dt[, sa := Csum_grp(c, group(.SD, "id"))])
   user  system elapsed
  1.116   0.296   1.415

> system.time(dt[, sb := Csum_grp2(c, group(.SD, "id")), verbose=T])
Assigning to all 90000000 rows
RHS_list_of_columns == false
Direct plonk of unnamed RHS, no copy.
   user  system elapsed
  1.188   0.276   1.462

> system.time(dt[, sc := sum(c), by=id, verbose=T])
Detected that j uses these columns: sc,c
Finding groups using forderv ... 1.049s elapsed (0.896s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.011s elapsed (0.008s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
  memcpy contiguous groups took 1.100s for 1800000 groups
  eval(j) took 1.444s for 1800000 calls
4.957s elapsed (2.924s cpu)
   user  system elapsed
  3.832   2.188   6.018

> system.time(dt[, sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.974s elapsed (0.860s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.008s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
GForce optimized j to 'gsum(c)'
Making each group and running j (GForce TRUE) ... gforce initial population of grp took 0.133
gforce assign high and low took 0.441
This gsum took (narm=FALSE) ... gather took ... 0.374s
0.533s
gforce eval took 0.533
1.131s elapsed (0.796s cpu)
   user  system elapsed
  1.668   0.448   2.117

> print(dt[sa!=sb])
Empty data.table (0 rows and 9 cols): id,a,b,c,d,e...

> print(dt[sb!=sc])
Empty data.table (0 rows and 9 cols): id,a,b,c,d,e...

With unsorted variable b and 26 groups:

> setDTthreads()

> print(getDTthreads())
[1] 14

> system.time(dt[, sa := Csum_grp(c, group(.SD, "b"))])
   user  system elapsed
  3.000   0.540   2.119

> system.time(dt[, sb := Csum_grp2(c, group(.SD, "b")), verbose=T])
Assigning to all 90000000 rows
RHS_list_of_columns == false
Direct plonk of unnamed RHS, no copy.
   user  system elapsed
  3.008   0.512   2.127

> system.time(dt[, sc := sum(c), by=b, verbose=T])
Detected that j uses these columns: sc,c
Finding groups using forderv ... 0.163s elapsed (1.136s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.000s elapsed (0.000s cpu)
Getting back original order ... 0.000s elapsed (0.000s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
  collecting discontiguous groups took 2.086s for 26 groups
  eval(j) took 0.103s for 26 calls
2.599s elapsed (2.864s cpu)
   user  system elapsed
  4.072   0.472   2.772

> system.time(dt[, sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.208s elapsed (1.664s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
GForce optimized j to 'gsum(c)'
Making each group and running j (GForce TRUE) ... gforce initial population of grp took 0.021
gforce assign high and low took 0.060
This gsum took (narm=FALSE) ... gather took ... 0.037s
0.051s
gforce eval took 0.051
0.155s elapsed (1.428s cpu)
   user  system elapsed
  3.108   0.820   0.376

Somme comments about results:

  1. When using := with a by statement, gforce is not used so the timings are better with Rcpp version (1.5s vs 6.0s).

  2. Timings are close between Rcpp/forderv and gforce/gsum. Of course, there is no interest in replacing GForce functions. But it’s a good news because it gives a way to have equivalent of GForce performance with the ease of use of customiized functions written in Rcpp.

  3. Version with nrows==0 are slightly better but need two distinct loops

  4. When setting nthread to 1, forderv takes more time (1.0s vs 0.2s) because it’s no more multithreaded. So global timings are quite the same. for this kind of calculation, multithreading is not a decisive advantage.

  5. With small number of groups, gforce is the fastest (0.4s vs 2.1s)

Many thanks to Jan Gorecki and Micheal Chirico for reviews on this article.

Download sources