Running aggregates are calculations that are commonly used for data analysis. This article is the first of several articles covering various techniques to calculate running aggregates with a focus on performance. I’ll discuss how to analyze the complexity of a solution, as well as explain how to predict performance changes based on changing variables in the data or the solution. This article focuses on set-based solutions using subqueries and joins.
A running aggregate is an aggregate of a measure that keeps accumulating over an ordered sequence, possibly within partitions. Consider a table called Sales that contains employees’ daily sales quantities and values. Run the code in Listing 1 to create the Sales table in the tempdb database for demo purposes. (I provide the code to populate this table with sample data later in the article.)
SET NOCOUNT ON; USE tempdb; IF OBJECT_ID('dbo.Sales', 'U') IS NOT NULL DROP TABLE dbo.Sales; CREATE TABLE dbo.Sales ( empid INT NOT NULL, -- partitioning column dt DATETIME NOT NULL, -- ordering column qty INT NOT NULL DEFAULT (1), -- measure 1 val MONEY NOT NULL DEFAULT (1.00), -- measure 2 CONSTRAINT PK_Sales PRIMARY KEY(empid, dt) ); GO
An example of a running aggregate is a running sum of the quantity (or value) for each employee and day. That is, for each employee and day, calculate the sum of quantity from the beginning of the employee’s activity until the current day. The elements involved in the calculation of a running aggregate include the measure you are aggregating (qty in our example), the ordering attribute (dt in our case), and if relevant, the partitioning attribute (empid). This is the most basic and classic type of a running aggregate. Variations exist that have different boundary points to determine the window of rows that the aggregate operates on. However, I’ll stick to the classic form since my focus is performance and in this sense the variations should be similar for the most part.
Regarding indexing guidelines, typically you need to come up with a solution before you can evaluate indexing strategies for it. With running aggregates, though, most solutions would benefit from the same indexing strategy. The ideal index is a covering one created on the partitioning columns followed by the ordering columns as the index keys, and the measures as included nonkey columns. One way to achieve such an indexing strategy is to create a clustered index on the partitioning columns followed by the ordering columns as the clustered index keys. Since the leaf rows of the clustered index will contain all other columns from the table, you get coverage of the measures as well. Our Sales table has such an index, which was implicitly created because of the primary key defined on (empid, dt). If you need your clustered index to be defined differently for your own reasons, another way to implement this strategy is to create a nonclustered index on the partitioning columns and sort columns as keys, and on the attributes holding the measures as included nonkey columns. The index definition in our case would look like this (don’t run this command, because we already have a clustered index that supports the desired strategy):
CREATE UNIQUE INDEX idx_runagg ON dbo.Sales(empid, dt) INCLUDE(qty, val);
Next, I provide instructions to populate the Sales table with sample data before presenting solutions. First, run the code in Listing 2 to create a helper function called GetNums.
IF OBJECT_ID('dbo.GetNums', 'IF') IS NOT NULL DROP FUNCTION dbo.GetNums; GO CREATE FUNCTION dbo.GetNums(@n AS BIGINT) RETURNS TABLE AS RETURN WITH L0 AS(SELECT 1 AS c UNION ALL SELECT 1), L1 AS(SELECT 1 AS c FROM L0 AS A CROSS JOIN L0 AS B), L2 AS(SELECT 1 AS c FROM L1 AS A CROSS JOIN L1 AS B), L3 AS(SELECT 1 AS c FROM L2 AS A CROSS JOIN L2 AS B), L4 AS(SELECT 1 AS c FROM L3 AS A CROSS JOIN L3 AS B), L5 AS(SELECT 1 AS c FROM L4 AS A CROSS JOIN L4 AS B), Nums AS(SELECT ROW_NUMBER() OVER(ORDER BY (SELECT 0)) AS n FROM L5) SELECT n FROM Nums WHERE n GO
GetNums accepts a number as input and returns a table result with a sequence of integers in the range 1 and the input number. Run the code in Listing 3 to populate the table with sample data. Notice in Listing 3 that you can set the number of partitions and the partition size. Currently, Listing 3 has 10,000 partitions with a partition size of 10. Later when I want to demonstrate the effect of changes in number of partitions or partition size, I’ll rerun the code in Listing 3 with other values as needed.
DECLARE @num_partitions AS INT, @rows_per_partition AS INT, @start_dt AS DATETIME; SET @num_partitions = 10000; SET @rows_per_partition = 10; SET @start_dt = '20090101'; TRUNCATE TABLE dbo.Sales; INSERT INTO dbo.Sales WITH (TABLOCK) (empid, dt) SELECT NP.n AS empid, DATEADD(day, RPP.n - 1, @start_dt) AS dt FROM dbo.GetNums(@num_partitions) AS NP CROSS JOIN dbo.GetNums(@rows_per_partition) AS RPP;
Set-Based Solution Using Subqueries
You might wonder why I’m covering both subqueries and joins in this article. The reason for this is that some people simply prefer to use subqueries, whereas others prefer to use joins. In addition, although some performance aspects are similar in both approaches, certain aspects of performance are different between the two approaches.
Let’s start with the subquery approach that Listing 4 shows. The outer query is against the Sales table, aliased as S1. From the outer instance you return the employee ID, quantity, and date. A subquery is in charge of calculating the running aggregate. The subquery is against a second instance of the Sales table, aliased as S2. The subquery filters the rows from S2 where the employee ID is the same as the one in S1, and the date is smaller than or equal to the one in S1. In the SELECT list, the subquery applies the SUM function to the qty attribute to aggregate all quantities from the qualifying rows. To evaluate the performance aspects of the solution, examine the execution plan for the solution query that Figure 1 shows.
SELECT empid, dt, qty, (SELECT SUM(S2.qty) FROM dbo.Sales AS S2 WHERE S2.empid = S1.empid AND S2.dt FROM dbo.Sales AS S1;
As you can see, the plan shows that the query was processed with a Nested Loops join. The outer input of the join is the result of a clustered index scan representing the outer instance of the Sales table that was aliased as S1. For each of those rows, the loop applies activity against the inner input of the join, which is the instance of the Sales table aliased as S2. This activity involves a clustered index seek followed by a partial scan to fetch all rows with the same empid as in the outer row, and a dt that is smaller than or equal to the one in the outer row. Those rows returned from the clustered index seek and partial scan are then aggregated with a Stream Aggregate operator.
Now that you’re familiar with the plan, let’s evaluate its complexity. Although we could consider various aspects of the plan’s complexity, let’s keep things simple by focusing on the number of rows that are processed by the plan and that need to be aggregated. For this purpose, we need to take into consideration the number of partitions involved (call it p), and the average partition size (call it r). As you can see in the plan, the bulk of the cost is associated with the activity that is run in a loop scanning the relevant range of rows in the clustered index for each of the outer rows. This loop runs as many times as the number of rows in the table, which is roughly pr. For each of those rows, the number of rows scanned, as mentioned earlier, is as many as the number of rows with the same empid as in the outer row, and a dt that is smaller than or equal to the one in the outer row. For the first dt value for a given employee, there will be 1 match, for the second 2, for the third 3, and for the rth r. This means that per each employee, the total number of rows processed is (1 + 2 + 3 + … + r). That’s an arithmetic sequence, whose sum is (r + r2)/2. Therefore, the total number of rows processed is p(r + r2)/2. As an example, currently there are 10,000 partitions in the table, each with a size of 10. Therefore the total number of rows processed and aggregated is 10,000 × (10 + 102)/2 = 550,000. Notice that in the actual execution plan in Figure 1 this is exactly the number of rows that appear in the tooltip box reporting how many rows were returned by the Clustered Index Seek operator (total for all iterations). Later we’ll see what the implications of this complexity are on performance in terms of changes in the variables involved.
Set-Based Solution Using Joins
Listing 5 illustrates the approach using a join. As you can see, the logic is similar to that applied by the subquery-based approach. However, in the join approach, S1 and S2 are the left and right sides of the join, respectively. The same predicate that was used in the subquery’s filter (S2.empid = S1.empid AND S2.dt Figure 2 shows the execution plan for this solution.
SELECT S1.empid, S1.dt, S1.qty, SUM(S2.qty) AS sumqty FROM dbo.Sales AS S1 JOIN dbo.Sales AS S2 ON S2.empid = S1.empid AND S2.dt GROUP BY S1.empid, S1.dt, S1.qty;
There are several differences to note in this plan compared with the plan for the subquery approach. In this case the optimizer chose a hash join, as opposed to the nested loops join used in the previous case. Don’t let the fact that the plan shows only two scans of the data confuse you. One scan is used to build the hash table and another is used to probe the hash table, but the number of rows produced by the join that are later aggregated is the same as the number of rows processed by the nested loop join in the plan for the subquery solution. You can see this clearly in the tooltip box showing the number of rows streaming out of the Hash Match (Inner Join) operator. The complexity of the join-based solution in terms of the number of rows processed by the join operator and aggregated is p(r + r2)/2 as well.
Both plans in Figure 1 and Figure 2 were captured for the given sample data with 10,000 partitions and a partition size of 10 rows. It’s interesting to note that when playing with different numbers of partitions and different partition sizes, the optimizer in some of the cases used the nested loops algorithm for the join-based solution as well. This was especially the case when testing larger partition sizes.
Another difference to note is the point where the aggregate is calculated. With the subquery approach, the aggregate is calculated in the inner branch of the join, per each outer row separately. With the join approach, the aggregate is calculated after the join, based on the result of the join. You can also observe that parallelism is handled differently in both cases.
Effect of Number of Aggregates
When you evaluate a solution’s performance, you need to be able to predict the effect of changes to certain variables. In this section I describe the effect of changes to the number of aggregates requested; I describe changes to other variables in subsequent sections of the article.
Both the solutions I showed you earlier requested only one running aggregate. Can you predict the effect of changing the number of requested aggregates? To do so, you need to be aware of a certain limitation of the optimizer when it comes to optimizing subqueries. When optimizing a query with multiple subqueries, even when all subqueries need to access the same window of rows, unfortunately the optimizer uses a separate scan of the data for each subquery. This means that if you have a aggregates, the complexity of the subquery-based solution in terms of the number of rows that will be processed and aggregated can be expressed as: ap(r + r2)/2. With the join approach, on the other hand, any number of aggregates can be applied to the result of the join based on the same scan of the data. Therefore, the complexity of the join-based solution is not affected by the number of aggregates and remains p(r + r2)/2 even when multiple aggregates are involved. I’m not saying that calculating more aggregates doesn’t take more effort, but the relevant window of rows has to be scanned only once.
Let’s make things more tangible by running the solutions and measuring their performance. Note that I ran all solutions with results discarded in order not to include the time it takes to print the result in the output. The code in Listing 6 shows a query implementing the subquery-based approach, asking for four aggregates. Figure 3 shows the right-hand part of the execution plan for this query. Notice that in addition to the initial scan of the data representing the outer instance of Sales called S1, the relevant data is scanned and aggregated four times for the four aggregates. The query shown earlier in Listing 4 with the single aggregate ran for 733 milliseconds on my system, whereas this one ran for 2,753 milliseconds—almost four times longer.
SELECT empid, qty, val, (SELECT SUM(S2.qty) FROM dbo.Sales AS S2 WHERE S2.empid = S1.empid AND S2.dt (SELECT AVG(S2.qty) FROM dbo.Sales AS S2 WHERE S2.empid = S1.empid AND S2.dt (SELECT SUM(S2.val) FROM dbo.Sales AS S2 WHERE S2.empid = S1.empid AND S2.dt (SELECT AVG(S2.val) FROM dbo.Sales AS S2 WHERE S2.empid = S1.empid AND S2.dt FROM dbo.Sales AS S1;
The code in Listing 7 shows a query implementing the join-based approach, asking for four aggregates. The execution plan for this query is very similar to the one shown earlier in Figure 2 for the single aggregate request. You’ll find that the same amount of data is scanned and processed. Naturally, the operators that calculate the aggregates will have a bit more work involved, but the amount of data scanned and processed doesn’t change. The query shown earlier in Listing 5 with the single aggregate ran for 593 milliseconds on my system, whereas this one ran for 846 milliseconds. As you can see, with the join approach the penalty for requesting more aggregates isn’t very high. Therefore, in this sense, the join-based approach is probably better if you need multiple aggregates.
SELECT S1.empid, S1.qty, S1.val, SUM(S2.qty) AS sumqty, AVG(S2.qty) AS avgqty, SUM(S2.val) AS sumqty, AVG(S2.val) AS avgval FROM dbo.Sales AS S1 JOIN dbo.Sales AS S2 ON S2.empid = S1.empid AND S2.dt GROUP BY S1.empid, S1.dt, S1.qty, S1.val;
Effect of Number of Partitions
From the algorithmic complexity of the solutions, you should be able to predict the effect of changing the number of partitions (while keeping the partition size constant, of course). The complexity of both the subquery-based and join-based solutions is p(r + r2)/2, only the subquery solution is also affected by the number of requested aggregates. If the number of partitions grows, the expectation is for the performance to degrade linearly. For example, if the number of partitions increases by a factor of f, you expect the run time to increase by a factor of f since the total number of rows processed becomes pf(r + r2)/2. Now it’s up to a benchmark to see whether practice agrees with theory.
I ran a benchmark with number of partitions varying between 10,000 and 100,000, with a constant partition size of 10. Figure 4 shows the benchmark results, which confirm our expectations. Linear performance degradation is typically a good thing, of course. It’s also interesting to note that with such small partitions, the performance of the two approaches is quite similar, with the join approach being slightly faster.
Effect of Partition Size
Unlike the linear performance degradation caused by an increase in the number of partitions, the performance degradation resulting from an increase in the partition size is quadratic because of the r2 part of the expression describing the number of rows processed. If the partition size grows by a factor of f, the number of rows processed grows by a factor of close to f2 because the expression describing the number of rows processed becomes p(rf + (rf) 2)/2. Therefore, if the partition size increases by a factor of f, you should expect the run time to increase by a factor of f2.
To test this theory, I ran a benchmark with a partition size varying between 100 and 1,000, with a constant number of partitions of 1,000. Figure 5 shows the benchmark results, which again confirm our expectations. Quadratic performance degradation is of course typically a bad thing. Beyond very small partition sizes, the performance becomes quite horrible. It’s also interesting to note that with bigger partition sizes, the subquery approach tends to perform better; at least that’s the case in the test system I used, with a single processor with two cores. Of course, the results on your system might vary with different hardware, as the number of processors and other factors might have an important effect, especially when parallelism is handled differently between the solutions.
Subquery vs. Join
Both the subquery and join solutions perform reasonably well when very small partitions are involved (up to several hundred rows per partition). As partition size grows, the performance of these solutions degrades in a quadratic (N2) manner, becoming quite poor. But as long as the partitions are small, performance degradation caused by an increase in number of partitions is linear. One factor that might affect your choice between using a subquery-based or join-based solution is the number of aggregates requested. As I discussed, the subquery-based approach requires a separate scan of the data for each aggregate, whereas the join-based approach doesn’t—so you’ll most likely want to use the join approach when you need to calculate multiple aggregates. Next month I’ll continue exploring other solutions to running aggregates.