Loop example: aggregating data
You can use the Transformer stage to add aggregated information to output rows.
Aggregation operations make use of a cache that stores input rows. You can monitor the number of entries in the cache by setting a threshold level in the Loop Variable tab of the Stage Properties window. If the threshold is reached when the job runs, a warning is issued into the log, and the job continues to run.
Input row group aggregation included with input row data
You can save input rows to a cache area, so that you can process this data in a loop.For example, you have input data that has a column holding a price value. You want to add a column to the output rows. The new column indicates what percentage the price value is of the total value for prices in all rows in that group. The value for the new Percentage column is calculated by the following expression.
(price * 100)/sum of all prices in group
Col1 | Col2 | Price |
---|---|---|
1000 | abc | 100.00 |
1000 | def | 20.00 |
1000 | ghi | 60.00 |
1000 | jkl | 20.00 |
2000 | zyx | 120.00 |
2000 | wvu | 110.00 |
2000 | tsr | 170.00 |
(price * 100)/200
The percentage for each row
in the group where Col1 = 2000 is calculated by the following expression.(price * 100)/400
The output is shown
in the following table.Col1 | Col2 | Price | Percentage |
---|---|---|---|
1000 | abc | 100.00 | 50.00 |
1000 | def | 20.00 | 10.00 |
1000 | ghi | 60.00 | 30.00 |
1000 | jkl | 20.00 | 10.00 |
2000 | zyx | 120.00 | 30.00 |
2000 | wvu | 110.00 | 27.50 |
2000 | tsr | 170.00 | 42.50 |
To implement this scenario in the Transformer stage, make the following settings:
- Stage variable
- Define the following stage variables:
- NumSavedRows
- SaveInputRecord()
- IsBreak
- LastRowInGroup(inlink.Col1)
- TotalPrice
- IF IsBreak THEN SummingPrice + inlink.Price ELSE 0
- SummingPrice
- IF IsBreak THEN 0 ELSE SummingPrice + inlink.Price
- NumRows
- IF IsBreak THEN NumSavedRows ELSE 0
- Loop condition
- Enter the following expression as the loop condition:
The loop continues to iterate for the count specified in the NumRows variable.@ITERATION <= NumRows
- Loop variables
- Define the following loop variable:
- SavedRowIndex
- GetSavedInputRecord()
- Output link metadata and derivations
- Define the output link columns and their derivations:
- Col1 - inlink.Col1
- Col2 - inlink.Col2
- Price - inlink.Price
- Percentage - (inlink.Price * 100)/TotalPrice
SaveInputRecord() is called in the first Stage Variable (NumSavedRows). SaveInputRecord() saves the current input row in the cache, and returns the count of records currently in the cache. Each input row in a group is saved until the break value is reached. At the last value in the group, NumRows is set to the number of rows stored in the input cache. The Loop Condition then loops round the number of times specified by NumRows, calling GetSavedInputRecord() each time to make the next saved input row current before re-processing each input row to create each output row. The usage of the inlink columns in the output link refers to their values in the currently retrieved input row, so will change on each output loop.
Caching selected input rows
You can call the SaveInputRecord() within an expression, so that input rows are only saved in the cache when the expression evaluates as true.
For example, you can implement the scenario described, but save only input rows where the price column is not 0. The settings are as follows:
- Stage variable
- Define the following stage variables:
- IgnoreRow
- IF (inlink.Price = 0) THEN 1 ELSE 0
- NumSavedRows
- IF IgnoreRecord THEN SavedRowSum ELSE SaveInputRecord()
- IsBreak
- LastRowInGroup(inlink.Col1)
- SavedRowSum
- IF IsBreak THEN 0 ELSE NumSavedRows
- TotalPrice
- IF IsBreak THEN SummingPrice + inlink.Price ELSE 0
- SummingPrice
- IF IsBreak THEN 0 ELSE SummingPrice + inlink.Price
- NumRows
- IF IsBreak THEN NumSavedRows ELSE 0
- Loop condition
- Enter the following expression as the loop condition:
@ITERATION <= NumRows
- Loop variables
- Define the following loop variable:
- SavedRowIndex
- GetSavedInputRecord()
- Output link metadata and derivations
- Define the output link columns and their derivations:
- Col1 - inlink.Col1
- Col2 - inlink.Col2
- Price - inlink.Price
- Percentage - (inlink.Price * 100)/TotalPrice
This example produces output similar to the previous example, but the aggregation does not include Price values of 0, and no output rows with a Price value of 0 are produced.
Outputting additional generated rows
This example is based on the first example, but, in this case, you want to identify any input row where the Price is greater than or equal to 100. If an input row has a Price greater than or equal to 100, then a 25% discount is applied to the Price and a new additional output row is generated. The Col1 value in the new row has 1 added to it to indicate an extra discount entry. The original input row is still output as normal. Therefore any input row with a Price of greater than or equal to 100 will produce two output rows, one with the discounted price and one without.
The input data is as shown in the following table:
Col1 | Col2 | Price |
---|---|---|
1000 | abc | 100.00 |
1000 | def | 20.00 |
1000 | ghi | 60.00 |
1000 | jkl | 20.00 |
2000 | zyx | 120.00 |
2000 | wvu | 110.00 |
2000 | tsr | 170.00 |
Col1 | Col2 | Price | Percentage |
---|---|---|---|
1000 | abc | 100.00 | 50.00 |
1001 | abc | 75.00 | 50.00 |
1000 | def | 20.00 | 10.00 |
1000 | ghi | 60.00 | 30.00 |
1000 | jkl | 20.00 | 10.00 |
2000 | zyx | 120.00 | 30.00 |
2001 | zyx | 90.00 | 30.00 |
2000 | wvu | 110.00 | 27.50 |
2001 | wvu | 82.50 | 27.50 |
2000 | tsr | 170.00 | 42.50 |
2001 | tsr | 127.50 | 42.50 |
To implement this scenario in the Transformer stage, make the following settings:
- Stage variable
- Define the following stage variables:
- NumSavedRowInt
- SaveInputRecord()
- AddRow
- IF (inlink.Price >= 100) THEN 1 ELSE 0
- NumSavedRows
- IF AddRow THEN SaveInputRecord() ELSE NumSavedRowInt
- IsBreak
- LastRowInGroup(inlink.Col1)
- TotalPrice
- IF IsBreak THEN SummingPrice + inlink.Price ELSE 0
- SummingPrice
- IF IsBreak THEN 0 ELSE SummingPrice + inlink.Price
- NumRows
- IF IsBreak THEN NumSavedRows ELSE 0
- Loop condition
- Enter the following expression as the loop condition:
The loop continues to iterate for the count specified in the NumRows variable.@ITERATION <= NumRows
- Loop variables
- Define the following loop variables:
- SavedRowIndex
- GetSavedInputRecord()
- AddedRow
- LastAddedRow
- LastAddedRow
- IF (inlink.Price < 100) THEN 0 ELSE IF (AddedRow = 0) THEN 1 ELSE 0
- Output link metadata and derivations
- Define the output link columns and their derivations:
- Col1 - IF (inlink.Price < 100) THEN inlink.Col1 ELSE IF (AddedRow = 0) THEN inlink.Col1 ELSE inlink.Col1 + 1
- Col2 - inlink.Col2
- Price - IF (inlink.Price < 100) THEN inlink.Price ELSE IF (AddedRow = 0) THEN inlink.Price ELSE inlink.Price * 0.75
- Percentage - (inlink.Price * 100)/TotalPrice
SaveInputRecord is called either once or twice depending on the value of Price. When SaveInputRecord is called twice, in addition to the normal aggregation, it produces the extra output record with the recalculated Price value. The Loop variable AddedRow is used to evaluate the output column values differently for each of the duplicate input rows.
Runtime errors
The number of calls to SaveInputRecord() and GetSavedInputRecord() must match for each loop. You can call SaveInputRecord() multiple times to add to the cache, but once you call GetSavedInputRecord(), then you must call it enough times to empty the input cache before you can call SaveInputRecord() again. The examples described can generate runtime errors in the following circumstances by not observing this rule:
- If your Transformer stage calls GetSavedInputRecord before SaveInputRecord,
then a fatal error similar to the following example is reported in
the job log:
APT_CombinedOperatorController,0: Fatal Error: get_record() called on record 1 but only 0 records saved by save_record()
- If your Transformer stage calls GetSavedInputRecord more times
than SaveInputRecord is called, then a fatal error similar to the
following example is reported in the job log:
APT_CombinedOperatorController,0: Fatal Error: get_record() called on record 3 but only 2 records saved by save_record()
- If your Transformer stage calls SaveInputRecord but does not call
GetSavedInputRecord, then a fatal error similar to the following example
is reported in the job log:
APT_CombinedOperatorController,0: Fatal Error: save_record() called on record 3, but only 0 records retrieved by get_record()
- If your Transformer stage does not call GetSavedInputRecord as
many times as SaveInputRecord, then a fatal error similar to the following
example is reported in the job log:
APT_CombinedOperatorController,0: Fatal Error: save_record() called on record 3, but only 2 records retrieved by get_record()