A Pipeline represents a series of steps that will be performed on 1 or more rows. Rows are Map objects that are passed from the Source through each step individually. A row can be modified, transformed into something else, or rejected by a step. If a row is rejected in any step all successive steps are not processed and the row is passed to the rejections Pipeline (if one exists). Any rows that pass through the entire Pipeline are said to have been loaded. Any row that does not pass through all steps is said to be rejected. Rows originate from the Pipeline's Source. The Source sends rows into the Pipeline until it finishes. The Pipeline keeps a set of statistics about how many rows were loaded, rejected, the types of rejections, timing, etc. This is kept in the LoadStatistic instance and returned from the Pipeline.go method. Example:
LoadStatistic stats = http("http://api.open-notify.org/astros.json"). inject { json -> json.people }. printRow(). go()In the above example you can see it using an HttpSource to fetch JSON data. That data is returned as a Map object which has other nested objects within it. In this case it's pulling out the "people" column which is a Collection of people objects. Then it injects those members into the down stream steps which uses printRow to print it to the console. The output would look like the following:
[name:Sergey Prokopyev, craft:ISS] [name:Alexander Gerst, craft:ISS] [name:Serena Aunon-Chancellor, craft:ISS] ===> ---- ==> inject() loaded 3 rejected 0 took 1 ms
Modifiers | Name | Description |
---|---|---|
static int |
DO_NOT_TRACK |
|
static java.lang.String |
REJECTED_KEY |
|
static org.slf4j.Logger |
logger |
Type Params | Return Type | Name and description |
---|---|---|
|
void |
addDefaultRejections() |
|
Pipeline |
addField(java.lang.String fieldName, groovy.lang.Closure fieldValue) Adds a new field to each row with the value returned by the given closure. |
|
Pipeline |
addStep(java.lang.CharSequence name = null, groovy.lang.Closure<java.util.Map<java.lang.String, java.lang.Object>> step) Adds a step to the pipeline. |
|
Pipeline |
after(groovy.lang.Closure<java.lang.Void> step) Adds a closure to the end of the Pipeline. |
|
Pipeline |
apply(groovy.lang.Closure<Pipeline> applyToPipeline) Passed a closure that is called with this Pipeline. |
|
Pipeline |
asBoolean(java.lang.String column) Parses the string value at given column into a java.lang.Boolean value. |
|
Pipeline |
asDate(java.lang.String column, java.lang.String... formats = ["yyyy-MM-dd"]) Parses the string at the given column name into a Date object using the given format. |
|
Pipeline |
asDouble(java.lang.String column) Return a Pipeline where the given column is converted from a string to a java.lang.Double. |
|
Pipeline |
asInt(java.lang.String column) Parses the string value at given column into a java.lang.Integer value. |
|
Pipeline |
branch(java.lang.CharSequence branchName = "branch", groovy.lang.Closure<Pipeline> split) Copies all rows on this Pipeline to another Pipeline that is passed to the given closure. |
|
Pipeline |
branch(java.util.Map<java.lang.String, ?> condition, groovy.lang.Closure<Pipeline> split) Copies all rows on this Pipeline to another Pipeline where the given condition returns is true. |
|
Pipeline |
clip(java.lang.String... columns) Remove all columns from each row so that only the fields given will be returned. |
|
Pipeline |
concat(Pipeline src) Concatenates the rows from this pipeline and the given pipeline. |
|
static Pipeline |
create(java.lang.CharSequence name, groovy.lang.Closure startClosure) Creates a pipeline where startClosure is the source. |
|
Pipeline |
decryptPgp(java.lang.String streamProperty, groovy.lang.Closure configure) Decrypts using PGP a stream on the Pipeline and rewrites the stream back onto the Pipeline. |
|
Pipeline |
defaultValues(java.util.Map<java.lang.String, java.lang.Object> defaults) Sets the destination column to the value within the given Map when the destination column is empty/null. |
|
Pipeline |
defaultsBy(java.util.Map<java.lang.String, java.lang.String> defaults) Sets the destination column to the value of the source column if it is empty or null. |
|
void |
doRejections(java.util.Map<java.lang.String, java.lang.Object> current, java.lang.CharSequence stepName, int lineNumber) |
|
Pipeline |
encryptPgp(java.lang.String streamProperty, groovy.lang.Closure configure) Encrypts using PGP a stream on the pipeline and rewrite that stream back to the pipeline. |
|
Pipeline |
exchange(groovy.lang.Closure<Pipeline> closure) This takes a closure that returns a Pipeline which is used to feed the Pipeline returned by exchange(). |
|
Pipeline |
fillDownBy(groovy.lang.Closure<java.lang.Boolean> decider) This returns a Pipeline where the rows with empty columns are filled in using the values in the previous row depending on what the given closure returns. |
|
Pipeline |
filter(java.lang.CharSequence name = "filter(, groovy.lang.Closure callback) This adds a step to the Pipeline that passes all rows where the given closure returns true to the next step on the pipeline. |
|
Pipeline |
filter(java.util.Map columns) This adds a step to the Pipeline that passes all rows where the values of the columns on the given Map are equal to the columns in the row. |
|
void |
finished() |
|
Pipeline |
flattenWindow(java.lang.String field, groovy.lang.Closure<java.util.List<java.util.Map<java.lang.String, java.lang.Object>>> windowClosure) Collects, or flattens, a set of rows that share the same value in succession within the stream into a List of those matching rows. |
|
java.lang.CharSequence |
getName() The name of the pipeline. |
|
LoadStatistic |
go() Starts processing the rows returned from the Source into the Pipeline. |
|
Pipeline |
groupBy(java.lang.String... columns) Return a Pipeline where the row is grouped by the given columns. |
|
Pipeline |
inject(java.lang.CharSequence name, groovy.lang.Closure<java.lang.Iterable<java.util.Map<java.lang.String, java.lang.Object>>> closure) Injects the Collection<Map> returned from the given closure into the downstream steps as individual rows. |
|
Pipeline |
inject(groovy.lang.Closure closure) Delegates to inject(java.lang.String, groovy.lang.Closure) with a default name. |
|
Pipeline |
intersect(Pipeline other, java.lang.Object columns) Return a Pipeline where all of the rows from this Pipeline and adds a single column "included" with a true/false value depending on whether the current row is occurs in the given Pipeline and the values of the specified columns are equal in both Pipelines. |
|
Pipeline |
join(Pipeline other, java.lang.Object columns, boolean left = false ) Returns Pipeline that joins the columns from this Pipeline with the given Pipeline where the columns are equal. |
|
Pipeline |
json(java.lang.String filename, java.util.List<java.lang.String> columns = null) Write out the rows produced to JSON file. |
|
Pipeline |
jsonl(java.lang.String filename, java.util.List<java.lang.String> columns = null) Write out the rows produced into the given filename as JSON Lines format. |
|
Pipeline |
jsonl(java.io.File file, java.util.List<java.lang.String> columns = null) Write out the rows produced into a given File as JSON Lines format. |
|
Pipeline |
limit(long limit, boolean halt = true) Limit the number of rows you take from a source to an upper bound. |
|
gratum.csv.CSVFile |
mergePage(gratum.csv.CSVFile page1, gratum.csv.CSVFile page2, java.util.Comparator<java.util.Map<java.lang.String, java.lang.Object>> comparator) |
|
Pipeline |
onRejection(groovy.lang.Closure<Pipeline> configure) Takes a closure that is passed the rejection Pipeline. |
|
Pipeline |
prependStep(java.lang.CharSequence name = null, groovy.lang.Closure<java.util.Map<java.lang.String, java.lang.Object>> step) Prepend a step to the pipeline. |
|
Pipeline |
printRow(java.lang.String... columns) Prints the values of the given columns for each row to the console, or all columns if no columns are given. |
|
boolean |
process(java.util.Map row, int lineNumber = -1) This method is used to send rows to the Pipeline for processing. |
|
Pipeline |
progress(int col = 50 ) |
|
Pipeline |
reduce(java.lang.String name, java.util.Map<java.lang.String, java.lang.Object> value, groovy.lang.Closure<java.util.Map<java.lang.String, java.lang.Object>> logic) Reduces all upstream rows into a value that is passed into the given closure (similar to Groovy inject method). |
|
static Rejection |
reject(java.lang.String reason, RejectionCategory category = RejectionCategory.REJECTION ) Helper method to create a Rejection object. |
|
static java.util.Map |
reject(java.util.Map<java.lang.String, java.lang.Object> row, java.lang.String reason, RejectionCategory category = RejectionCategory.REJECTION ) |
|
void |
reject(java.util.Map<java.lang.String, java.lang.Object> row, int lineNumber = -1) |
|
Pipeline |
removeField(java.lang.String fieldName, groovy.lang.Closure removeLogic = null) Removes a field based on whether the given closure returns true or false. |
|
Pipeline |
renameFields(java.util.Map fieldNames) Rename a row's columns in the given map to the value of the corresponding key. |
|
Pipeline |
replaceAll(java.lang.String column, java.util.regex.Pattern regEx, java.lang.String withClause) Replaces all occurrences of the regular expression with given withClause. |
|
Pipeline |
replaceValues(java.lang.String column, java.util.Map<java.lang.String, java.lang.String> values) Given a column replace all values at that column that match the given key with the value of the values Map. |
|
Pipeline |
save(java.lang.String filename, java.lang.String separator = ", java.util.List<java.lang.String> columns = null ) This writes each row to the specified filename as a CSV separated by the given separator. |
|
Pipeline |
save(java.io.File csvFile, java.lang.String separator = ", java.util.List<java.lang.String> columns = null) This writes each row to the specified file as a CSV separated by the given separator. |
|
Pipeline |
save(Sink<java.util.Map<java.lang.String, java.lang.Object>> sink) Write the rows to a provided Sink. |
|
Pipeline |
setField(java.lang.String fieldName, java.lang.Object value) Sets a fieldName in each row to the given value. |
|
Pipeline |
sort(groovy.lang.Tuple2<java.lang.String, SortOrder>... columns) Return a Pipeline where the rows are ordered by the given columns and sort order. |
|
Pipeline |
sort(java.lang.CharSequence name, groovy.lang.Closure configure) Sort the rows according to the given comparator |
|
Pipeline |
sort(java.lang.String... columns) Return a Pipeline where the rows are ordered by the given columns. |
|
Pipeline |
source(Source source) Assigned a new source to a Pipeline and returns the pipeline. |
|
void |
start() Start processing rows from the source of the pipeline. |
|
LoadStatistic |
toLoadStatistic(long start, long end) |
|
Pipeline |
trim() Returns a Pipeline where all white space is removed from all columns contained within the rows. |
|
Pipeline |
unique(java.lang.String column) Only allows rows that are unique per the given column. |
Methods inherited from class | Name |
---|---|
class java.lang.Object |
java.lang.Object#wait(long, int), java.lang.Object#wait(long), java.lang.Object#wait(), java.lang.Object#equals(java.lang.Object), java.lang.Object#toString(), java.lang.Object#hashCode(), java.lang.Object#getClass(), java.lang.Object#notify(), java.lang.Object#notifyAll() |
Adds a new field to each row with the value returned by the given closure.
fieldName
- The new field name to addfieldValue
- The closure that returns a value to set the given field's name to.Adds a step to the pipeline. It's passed an optional name to identify the step by, and a closure that represents the individual step. It returns the Map to be processed by the next step in the pipeline, typically it simply returns the same row it was passed. If it returns null or Rejection then it will reject this row, stop processing additional steps, and pass the current row to the rejections pipeline.
name
- The step namestep
- The code used to process each row on the Pipeline.Adds a closure to the end of the Pipeline. This is called after all rows are processed. This closure is invoked without any arguments.
step
- the Closure that is invoked after all rows have been processed.Passed a closure that is called with this Pipeline. This enables you to perform operation on the Pipeline itself without breaking the flow of functional chain. This does not add a step to the Pipeline.
The
- ClosureParses the string value at given column into a java.lang.Boolean value. It understands values like: Y/N, YES/NO, TRUE/FALSE, 1/0, T/F.
column
- containing a string to be turned into a java.lang.BooleanParses the string at the given column name into a Date object using the given format. Any value that cannot be parsed by the format is rejected. Null values or empty strings are not rejected.
column
- The field to use to find the string value to parseformats
- One or more formats of the string to use to parse into a java.util.Date. The first format
that parses without exception will be used. (default format is "yyyy-MM-dd")Return a Pipeline where the given column is converted from a string to a java.lang.Double.
column
- The name of the column to convert into a DoubleParses the string value at given column into a java.lang.Integer value.
column
- containing a string to be turned into a java.lang.IntegerCopies all rows on this Pipeline to another Pipeline that is passed to the given closure. The given closure can configure additional steps on the branched Pipeline. The rows passed through this Pipeline are not modified.
split
- The closure that is passed a new Pipeline where all the rows from this Pipeline are copied onto.Copies all rows on this Pipeline to another Pipeline where the given condition returns is true. The given condition works the same way filter(java.util.Map) does. This ia combination of branch and filter. No rows on this pipeline are filtered out. Only the rows on the branch will be filtered.
condition
- The conditions that must be equal in order for the row to be copied to the branch.split
- The closure that is passed the branch Pipeline.Remove all columns from each row so that only the fields given will be returned.
columns
- THe columns names to retain from each rowConcatenates the rows from this pipeline and the given pipeline. The resulting Pipeline will process all rows from this pipeline and the src pipeline.
src
- The pipelineCreates a pipeline where startClosure is the source. The startClosure is passed another closure that it can use to pass an individual row to the Pipeline.
name
- name of the pipeline to createstartClosure
- A closure that is called with a closure. The startClosure can call the closure argument it's
passed to send a row into the pipeline.Decrypts using PGP a stream on the Pipeline and rewrites the stream back onto the Pipeline. It looks for a stream at the given streamProperty. Further configuration is performed by the provided Closure that is passed a gratum.pgp.PpgContext. You are required to setup the identity passphrase and the secret key collection used to decrypt. It also adds the file and filename properties to the existing row.
streamProperty
- The property within the row on the Pipeline that stores a stream.configure
- The closure called with a PgpContext object to further configure how it will decrypt the stream.Sets the destination column to the value within the given Map when the destination column is empty/null.
defaults
- A Map of destination column to value where destination column is key and the values are the default
values used when the destination column is empty/null.Sets the destination column to the value of the source column if it is empty or null.
defaults
- A Map containing the destination column as the key and source column as the value.Encrypts using PGP a stream on the pipeline and rewrite that stream back to the pipeline. It looks for a stream on the Pipeline at streamProperty. Further configuration is performed by the provided Closure that is passed a gratum.pgp.PpgContext. You are required to setup the identities, secret key collection, and/or public key collection in order to encrypt. This will write the encrypted stream back to the Pipeline on the provided streamProperty. It also adds the file and filename properties to the existing row.
streamProperty
- The property that holds a stream object to be encrypted.configure
- The Closure that is passed the PgpContext used to configure how the stream will be encrypted.This takes a closure that returns a Pipeline which is used to feed the Pipeline returned by exchange(). The closure will be called for each row emitted from this Pipeline so the closure could create multiple Pipelines, and all data from every Pipeline will be fed into the returned Pipeline.
closure
- A closure that takes a Map and returns a pipeline that it's data will be fed on to the
returned pipeline.This returns a Pipeline where the rows with empty columns are filled in using the values in the previous row depending on what the given closure returns. If the closure returns true then any empty column (value == null or value.isEmpty()) will be populated by the values in the previous row.
decider
- a Closure which decides if the values from a prior row will be used to fill in missing values in the current row.This adds a step to the Pipeline that passes all rows where the given closure returns true to the next step on the pipeline. All rows where the closure returns false are rejected.
callback
- A callback that is passed a row, and returns a boolean. All rows that return a false are rejected.This adds a step to the Pipeline that passes all rows where the values of the columns on the given Map are equal to the columns in the row. This is a boolean AND between columns. For example: .filter( [ hair: 'Brown', eyeColor: 'Blue' ] ) In this example all rows where hair = Brown AND eyeColor = Blue are passed through the filter. You can also pass a java.util.Collection of values to compare a column to. For example, .filter( [hair: ['Brown','Blonde'], eyeColor: ['Blue','Green'] ] ) In this example you need hair of 'Brown' or 'Blonde' and eyeColor of 'Blue' or 'Green'. You can also pass a groovy.lang.Closure as a value for a column and it will filter based on the value returned by the Closure. For example, .filter( [hair: { String v -> v.startsWith('B') }, eyeColor: 'Brown'] ) In this example it invokes the Closure with the value at row['hair'] and the Closure evaluates to a boolean to decide if a row is filtered or not. .filter( "*": { Map row -> row['hair'] == "Brown" || row['eyeColor'] == 'Blue' } ) This is the wildcard example, that allows you to embed a closure into the filter Map to create much more complex queries using the entire row. So implementing OR logic is possible between multiple fields.
columns
- a Map that contains the columns, and their values that are passed throughCollects, or flattens, a set of rows that share the same value in succession within the stream into a List of those matching rows. This groups rows that share the same value of the given field name. Then invokes the given windowClosure passing the list of grouped rows. The windowClosure returns a list of the rows it wants to continue downstream. This works best with data that has a somewhat predictable order when it comes to the given field name. The flattenWindow operation caches up rows that share the same value for the given field, and once it encounters a row whose given value differs it invokes the windowClosure with the like valued cached rows. Then it proceeds caching rows that match the different valued row and repeats the same caching logic.
field
- the field name to look for common values within.windowClosure
- the logic to use the process the rows that share a successive common value in the field name.The name of the pipeline. This is used by LoadStatistic to identify what Pipeline it came from.
Starts processing the rows returned from the Source into the Pipeline. When the Source is finished this method returns the LoadStatistic for this Pipeline.
Return a Pipeline where the row is grouped by the given columns. The resulting Pipeline will only return a single row where the keys of that row will be the first column passed to the groupBy() method. All other columns given will occur under the respective keys. This yields a tree like structure where the height of the tree is equal to the columns.length. In the leaves of the tree are the rows that matched all of their parents.
columns
- The columns to group each row by.Injects the Collection<Map> returned from the given closure into the downstream steps as individual rows. The given closure is called for every row passed through the preceding step. Each member of the returned collection will be fed into downstream steps as separate rows.
name
- The name of the stepclosure
- Takes a Map and returns a Collection<Map> that will be fed into the downstream stepsDelegates to inject(java.lang.String, groovy.lang.Closure) with a default name.
closure
- Takes a Map and returns a Collection<Map> that will be fed into the downstream stepsReturn a Pipeline where all of the rows from this Pipeline and adds a single column "included" with a true/false value depending on whether the current row is occurs in the given Pipeline and the values of the specified columns are equal in both Pipelines.
other
- Pipeline to verify if the rows where the columns of those rows are equal
to the rows in this Pipelinethe
- list of columns to check against (either Map or Collection).
return A Pipeline where each row in this Pipeline is in the new Pipeline and a colun
included is added based on if this row according to the given columns also occurs in
the other PipelineReturns Pipeline that joins the columns from this Pipeline with the given Pipeline where the columns are equal. It will perform a left or right join depending on the left parameter. A left join will return the row even if it doesn't find a match row on the right (also known as a left join). A right join (ie left = false) will not return a row if it doesn't find a matching row on the right (Also known as an inner join). Default is left = false. Columns can be specified in 3 different ways: Map, Collection or Object. Using a Map allows you to specify both keys from the left Pipeline and the right Pipeline thus mapping column to column. For example, a Pipeline that has People objects on it might have an id column, and the right Pipeline has Hobbies which carry a person_id column. To Map id -> person_id you'd use the following: people.join( hobbies, [id: "person_id"] ] That would mean match up rows such that people[id] = hobbies[person_id]. Using a Collection simply means the columns are the same name in both Pipelines. And using an Object simply calls toString() on it and uses that as the column name shared by both Pipelines.
other
- The right side Pipeline to use for the joincolumns
- The columns to join onleft
- perform a left join (ie true) or a right join (false)Write out the rows produced to JSON file.
filename
- the filename to save the JSON into.columns
- Optional list of columns to use clip out certain columns you want to include in the output. If left
off all columns are included.Write out the rows produced into the given filename as JSON Lines format.
filename
- the filename to save the JSON lines into.columns
- Optional list of columns to use clip out certain columns you want to include in the output. If left
off all columns are included.Write out the rows produced into a given File as JSON Lines format.
file
- the java.io.File to save the JSON lines into.columns
- Optional list of columns to use clip out certain columns you want to include in the output. If left
off all columns are included.Limit the number of rows you take from a source to an upper bound. After the upper bound is hit it will either stop processing rows immediately or continue processing rows but simply reject the remaining based on if halt is true or false, respectively. If it's rejected rows then all steps above the limit will be executed for all rows. All steps after the limit operation will only process limit number of rows.
limit
- An upper limit on the number of rows this pipeline will process.halt
- after limit has been exceeded stop processing any additional rows (halt = true),
or continue to process rows but simply reject all rows (halt = false).Takes a closure that is passed the rejection Pipeline. The closure can register steps on the rejection pipeline, and any rejections from the parent pipeline will be passed through the given rejection pipeline.
configure
- Closure that's passed the rejection the pipelinePrepend a step to the pipeline.
name
- The Step namestep
- The code used to process each row on the Pipeline.Prints the values of the given columns for each row to the console, or all columns if no columns are given.
columns
- The names of the columns to print to the consoleThis method is used to send rows to the Pipeline for processing. Each row passed will start at the first step of the Pipeline and proceed through each step.
row
- The row to be processed by the Pipeline's steps.lineNumber
- The lineNumber from the Source to use when tracking this row through the PipelineReduces all upstream rows into a value that is passed into the given closure (similar to Groovy inject method). The downstream result is the value returned from the closure's final invocation. The downstream operators will see only 1 row object.
name
- The given name of this stepvalue
- The initial value passed into the given closure.logic
- The closure that is invoked given the current value and the current row.Helper method to create a Rejection object.
reason
- A text explanation for what caused the rejectioncategory
- The rejection category to group this specific rejectionRemoves a field based on whether the given closure returns true or false. The closure is optional which will always remove the fieldName if not provided.
fieldName
- the name of the field to remove depending on what the optional closure returnsremoveLogic
- an optional closure that when given can return true or false to indicate to remove
the field or not. If not provided the field is always removed.Rename a row's columns in the given map to the value of the corresponding key.
fieldNames
- The Map of src column to renamed names.Replaces all occurrences of the regular expression with given withClause. The withClause can use $1, $2, $3, etc to refer to groups within the regular expression just like in replaceAll method.
column
- the name of the column to pull the value from.regEx
- the regular expression to use for replacing sections of the value.withClause
- the replacement clause swap out the portion matched by the
regular expression.Given a column replace all values at that column that match the given key with the value of the values Map. This basically transform values[key] into values[value] and stores that in the given column.
column
- the column to compare against the value map key.values
- the key and value pair that will be substituted given the value at the columnThis writes each row to the specified filename as a CSV separated by the given separator. It can optionally select a subset of column names from each row. If unspecified all columns will be saved.
filename
- the filename to write the CSV file toseparator
- the field separator to use between each field value (default ",")columns
- the list of fields to write from each row. (default null)This writes each row to the specified file as a CSV separated by the given separator. It can optionally select a subset of column names from each row. If unspecified all columns will be saved.
csvFile
- the File to write the CSV file toseparator
- the field separator to use between each field value (default ",")columns
- the list of fields to write from each row. (default null)Write the rows to a provided Sink. It returns the Pipeline that returns the results of the given sink.
sink
- the concrete Sink instance to save data to.Sets a fieldName in each row to the given value.
fieldName
- The new field name to addvalue
- the value of the new field nameReturn a Pipeline where the rows are ordered by the given columns and sort order.
columns
- a list of Tuple2 where the first item is the column name and the second is the sort order.Sort the rows according to the given comparator
name
- - The name identifying the step added to the pipeline for sort.comparator
- - The comparator used to sort the rows.configure
- - a closure to configure the behavior of the sort. The delegate is a
SortConfigReturn a Pipeline where the rows are ordered by the given columns. The value of each column is compared using the <=> operator.
columns
- to sort byAssigned a new source to a Pipeline and returns the pipeline.
source
- to use as the Pipeline's sourceStart processing rows from the source of the pipeline.
Returns a Pipeline where all white space is removed from all columns contained within the rows.
Only allows rows that are unique per the given column.
column
- The column name to use for checking uniquenessGroovy Documentation