This document provides a comprehensive reference guide to Nextflow channel factories with detailed examples and use cases.
Introduction to Channel Factories
Channel factories are methods that create Nextflow channels from various data sources. Channels are the primary mechanism for passing data between processes in Nextflow.
Channel Characteristics
- Asynchronous: Data flows asynchronously through channels
- Immutable: Once created, channel contents cannot be modified
- Lazy: Channels are evaluated only when consumed
- Type-safe: Channels maintain data types
Basic Channel Factories
1. channel.of()
Creates a channel from a list of values.
Syntax:
channel.of(value1, value2, value3, ...)
Examples:
// Simple values
ch_numbers = channel.of(1, 2, 3, 4, 5)
// Strings
ch_samples = channel.of('sample1', 'sample2', 'sample3')
// Mixed types
ch_mixed = channel.of('sample1', 100, true, '/path/to/file.txt')
// Tuples
ch_tuples = channel.of(['sample1', '/path/to/file1.fq'], ['sample2', '/path/to/file2.fq'])
// Empty channel
ch_empty = channel.of()
Use Cases:
- Creating test data
- Hardcoded reference files
- Configuration values
- Small, known datasets
2. channel.value()
Creates a channel emitting a single value.
Syntax:
channel.value(value)
Examples:
// Single value
ch_genome = channel.value('/path/to/genome.fasta')
// Tuple value
ch_config = channel.value([id: 'config', path: '/path/to/config.txt'])
// File value
ch_reference = channel.value(file('/path/to/reference.fasta', checkIfExists: true))
// Used for broadcasting to multiple processes
ch_genome = channel.value('/path/to/genome.fasta')
PROCESS1(ch_input, ch_genome)
PROCESS2(ch_input, ch_genome) // Same genome used in both
Use Cases:
- Reference files used by multiple processes
- Configuration values
- Single metadata values
- Broadcasting values to multiple processes
3. channel.empty()
Creates an empty channel.
Syntax:
channel.empty()
Examples:
// Initialize empty channel for collection
ch_multiqc_files = channel.empty()
ch_versions = channel.empty()
// Conditional channel creation
ch_optional = params.use_optional ? channel.of('value') : channel.empty()
// Mix with empty channel
ch_results = ch_results.mix(channel.empty()) // No effect, but safe
// Use in conditional logic
if (ch_data.isEmpty()) {
log.warn("No data available")
}
Use Cases:
- Initializing collection channels
- Conditional channel creation
- Placeholder for optional inputs
- Testing empty channel handling
File-Based Channel Factories
4. channel.fromPath()
Creates a channel from file paths.
Syntax:
channel.fromPath(path, options)
Options:
checkIfExists: true- Verify file existsglob: true- Enable glob patternstype: 'file'or'dir'- Specify typemaxDepth: 1- Maximum directory depth for globfollowLinks: true- Follow symbolic links
Basic Examples:
// Single file
ch_fasta = channel.fromPath('/path/to/genome.fasta', checkIfExists: true)
// Multiple files
ch_files = channel.fromPath(['/path/to/file1.txt', '/path/to/file2.txt'])
// Glob pattern
ch_fastq = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
// Recursive glob
ch_all_fastq = channel.fromPath('/data/**/*.fastq.gz', checkIfExists: true)
// Specific file type
ch_bam_files = channel.fromPath('/data/*.bam', type: 'file', checkIfExists: true)
// Directory
ch_dirs = channel.fromPath('/data/samples/*', type: 'dir', checkIfExists: true)
// With maxDepth
ch_shallow = channel.fromPath('/data/**/*.txt', maxDepth: 2, checkIfExists: true)
// From parameter
ch_input = channel.fromPath(params.input, checkIfExists: true)
Advanced Examples:
// Multiple patterns
ch_mixed = channel.fromPath([
'/data/*.fastq.gz',
'/data/*.bam',
'/data/*.vcf'
], checkIfExists: true)
// Exclude patterns (using filter)
ch_fastq = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
.filter { !it.name.contains('unmapped') }
// Sorted by modification time
ch_recent = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
.sort { it.lastModified() }
.reverse() // Most recent first
// Creating channels from CSV/TSV files
ch_samplesheet = channel.fromPath('samplesheet.csv', checkIfExists: true)
.splitCsv(header: true, sep: ',')
.map { row -> [row.sample, row.fastq_1, row.fastq_2] }
// With validation
ch_samplesheet = channel.fromPath('samplesheet.csv', checkIfExists: true)
.splitCsv(header: true, sep: ',')
.map { row ->
def meta = [id: row.sample, type: row.type]
def files = row.fastq_2 ? [row.fastq_1, row.fastq_2] : [row.fastq_1]
[meta, files]
}
// TSV file
ch_data = channel.fromPath('data.tsv', checkIfExists: true)
.splitCsv(header: true, sep: '\t')
.map { row -> [row.id, row.value] }
// Creating channels from JSON files
import groovy.json.JsonSlurper
ch_json = channel.fromPath('config.json', checkIfExists: true)
.map { file ->
def json = new JsonSlurper().parse(file)
[json.sample_id, json.fastq_path]
}
Use Cases:
- Reading input files
- Finding files matching patterns
- Loading reference data
- Processing directory contents
- Parsing structured data files (CSV, TSV, JSON)
5. channel.fromFilePairs()
Creates a channel from file pairs (e.g., paired-end FASTQ files).
Syntax:
channel.fromFilePairs(pattern, options)
Options:
checkIfExists: true- Verify files existsize: -1- Number of files per group (-1 = all)flat: true- Flatten tuple structure
Examples:
// Standard paired-end pattern (_1/_2 suffix)
ch_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz', checkIfExists: true)
// Emits: [sample_id, [file1, file2]]
// Custom pattern (R1/R2)
ch_paired = channel.fromFilePairs('/data/*_{R1,R2}.fastq.gz', checkIfExists: true)
// Multiple file pairs
ch_triplets = channel.fromFilePairs('/data/*_{1,2,3}.fastq.gz', size: 3, checkIfExists: true)
// Emits: [sample_id, [file1, file2, file3]]
// Flat structure
ch_flat = channel.fromFilePairs('/data/*_{1,2}.fastq.gz', flat: true, checkIfExists: true)
// Emits: [sample_id, file1, file2] (no nested list)
// With metadata
ch_paired.map { id, files -> [id: id, single_end: false, files: files] }
Use Cases:
- Paired-end sequencing data
- Multiple related files per sample
- Grouping files by sample ID
- Processing file pairs together
6. channel.fromSRA()
Creates a channel from SRA run IDs (requires nf-core/fetchngs or similar).
Syntax:
channel.fromSRA(run_ids, options)
Examples:
// Single SRA ID
ch_sra = channel.fromSRA('SRR123456')
// Multiple SRA IDs
ch_sra = channel.fromSRA(['SRR123456', 'SRR123457', 'SRR123458'])
// From file
ch_sra_ids = channel.fromPath('sra_ids.txt', checkIfExists: true)
.splitText()
.map { it.trim() }
ch_sra = channel.fromSRA(ch_sra_ids)
Use Cases:
- Downloading data from SRA
- Processing public sequencing data
- Batch SRA downloads
Data Structure Channel Factories
7. channel.fromList()
Creates a channel from a Groovy list.
Syntax:
channel.fromList(list)
Examples:
// Simple list
ch_list = channel.fromList([1, 2, 3, 4, 5])
// List of tuples
ch_samples = channel.fromList([
['sample1', '/path/to/file1.fq'],
['sample2', '/path/to/file2.fq'],
['sample3', '/path/to/file3.fq']
])
// List of maps
ch_metadata = channel.fromList([
[id: 'sample1', type: 'riboseq', condition: 'control'],
[id: 'sample2', type: 'riboseq', condition: 'treated'],
[id: 'sample3', type: 'rnaseq', condition: 'control']
])
// From variable
def samples = ['sample1', 'sample2', 'sample3']
ch_samples = channel.fromList(samples)
// Nested lists
ch_nested = channel.fromList([
['sample1', ['file1_1.fq', 'file1_2.fq']],
['sample2', ['file2_1.fq', 'file2_2.fq']]
])
Use Cases:
- Converting lists to channels
- Hardcoded sample lists
- Programmatically generated data
- Small datasets
Channel Operators
Channel operators are methods that transform, filter, combine, or manipulate channels. They are grouped by functionality below.
Transformation Operators
Transform elements or channel structure.
5. map()
Transforms each element in a channel.
Examples:
// Simple transformation
ch_numbers = channel.of(1, 2, 3, 4, 5)
ch_doubled = ch_numbers.map { it * 2 }
// Emits: 2, 4, 6, 8, 10
// Extract file name
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_names = ch_files.map { it.name }
// Emits: file1.fastq.gz, file2.fastq.gz, ...
// Add metadata
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_with_meta = ch_files.map { file ->
[id: file.baseName, file: file]
}
// Complex transformation
ch_samples = channel.fromList(['sample1', 'sample2'])
ch_processed = ch_samples.map { sample ->
[
id: sample,
fastq_1: "/data/${sample}_R1.fastq.gz",
fastq_2: "/data/${sample}_R2.fastq.gz",
type: 'riboseq'
]
}
6. filter()
Filters elements based on a condition.
Examples:
// Filter by value
ch_numbers = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_even = ch_numbers.filter { it % 2 == 0 }
// Emits: 2, 4, 6, 8, 10
// Filter files by extension
ch_files = channel.fromPath('/data/*')
ch_fastq = ch_files.filter { it.name.endsWith('.fastq.gz') }
// Filter by metadata
ch_samples = channel.fromList([
[id: 's1', type: 'riboseq'],
[id: 's2', type: 'rnaseq'],
[id: 's3', type: 'riboseq']
])
ch_riboseq = ch_samples.filter { it.type == 'riboseq' }
// Emits: [id: 's1', type: 'riboseq'], [id: 's3', type: 'riboseq']
// Filter tuples
ch_data = channel.of(
['sample1', 'riboseq', '/path/to/file1.fq'],
['sample2', 'rnaseq', '/path/to/file2.fq'],
['sample3', 'riboseq', '/path/to/file3.fq']
)
ch_riboseq = ch_data.filter { meta, file -> meta[1] == 'riboseq' }
7. flatMap()
Maps each element to multiple elements and flattens the result.
Examples:
// Expand list elements
ch_lists = channel.of([1, 2], [3, 4], [5, 6])
ch_flat = ch_lists.flatMap { it }
// Emits: 1, 2, 3, 4, 5, 6
// Generate multiple outputs per input
ch_samples = channel.of('sample1', 'sample2')
ch_files = ch_samples.flatMap { sample ->
[
"${sample}_R1.fastq.gz",
"${sample}_R2.fastq.gz",
"${sample}_R3.fastq.gz"
]
}
// Emits: sample1_R1.fastq.gz, sample1_R2.fastq.gz, sample1_R3.fastq.gz, sample2_R1.fastq.gz, sample2_R2.fastq.gz, sample2_R3.fastq.gz
// Process directory contents
ch_dirs = channel.fromPath('/data/samples/*', type: 'dir')
ch_files = ch_dirs.flatMap { dir ->
dir.listFiles().findAll { it.name.endsWith('.fastq.gz') }
}
Grouping and Combining Operators
Group elements or combine multiple channels.
8. groupTuple()
Groups elements by a key (first element by default).
Examples:
// Group by first element
ch_data = channel.of(
['sample1', 'file1.fq'],
['sample1', 'file2.fq'],
['sample2', 'file3.fq'],
['sample2', 'file4.fq']
)
ch_grouped = ch_data.groupTuple()
// Emits: ['sample1', ['file1.fq', 'file2.fq']], ['sample2', ['file3.fq', 'file4.fq']]
// Group by specific index
ch_data = channel.of(
['sample1', 'lane1', 'file1.fq'],
['sample1', 'lane2', 'file2.fq'],
['sample2', 'lane1', 'file3.fq']
)
ch_by_sample = ch_data.groupTuple(by: 0)
// Groups by sample (first element)
// Group by metadata key
ch_data = channel.of(
[[id: 's1', type: 'riboseq'], 'file1.fq'],
[[id: 's1', type: 'riboseq'], 'file2.fq'],
[[id: 's2', type: 'rnaseq'], 'file3.fq']
)
ch_grouped = ch_data.groupTuple(by: 0)
// Groups by metadata map
9. join()
Joins two channels by matching keys.
Examples:
// Join by first element (default)
ch_samples = channel.of(['sample1', 'data1'], ['sample2', 'data2'])
ch_metadata = channel.of(['sample1', 'meta1'], ['sample2', 'meta2'])
ch_joined = ch_samples.join(ch_metadata)
// Emits: ['sample1', 'data1', 'meta1'], ['sample2', 'data2', 'meta2']
// Join by specific index
ch_samples = channel.of(['sample1', 'data1'], ['sample2', 'data2'])
ch_metadata = channel.of(['sample1', 'type1', 'meta1'])
ch_joined = ch_samples.join(ch_metadata, by: 0)
// Joins on first element
// Join with multiple keys
ch_data1 = channel.of(['sample1', 'lane1', 'data1'])
ch_data2 = channel.of(['sample1', 'lane1', 'data2'])
ch_joined = ch_data1.join(ch_data2, by: [0, 1])
// Joins on sample and lane
// Join with metadata
ch_bam = channel.of(
[[id: 's1'], '/path/to/s1.bam'],
[[id: 's2'], '/path/to/s2.bam']
)
ch_index = channel.of(
[[id: 's1'], '/path/to/s1.bai'],
[[id: 's2'], '/path/to/s2.bai']
)
ch_joined = ch_bam.join(ch_index, by: 0)
// Emits: [[id: 's1'], '/path/to/s1.bam', '/path/to/s1.bai'], ...
10. combine()
Combines channels creating a Cartesian product.
Examples:
// Combine two channels
ch_samples = channel.of('sample1', 'sample2')
ch_treatments = channel.of('control', 'treated')
ch_combined = ch_samples.combine(ch_treatments)
// Emits: ['sample1', 'control'], ['sample1', 'treated'], ['sample2', 'control'], ['sample2', 'treated']
// Combine multiple channels
ch_samples = channel.of('s1', 's2')
ch_treatments = channel.of('ctrl', 'trt')
ch_replicates = channel.of('rep1', 'rep2')
ch_all = ch_samples.combine(ch_treatments).combine(ch_replicates)
// Creates all combinations
// Combine with metadata
ch_fasta = channel.of(['ref1', '/path/to/ref1.fa'])
ch_gtf = channel.of(['ref1', '/path/to/ref1.gtf'])
ch_combined = ch_fasta.combine(ch_gtf)
// Emits: [['ref1', '/path/to/ref1.fa'], ['ref1', '/path/to/ref1.gtf']]
11. mix()
Mixes multiple channels into one.
Examples:
// Mix two channels
ch_channel1 = channel.of('item1', 'item2')
ch_channel2 = channel.of('item3', 'item4')
ch_mixed = ch_channel1.mix(ch_channel2)
// Emits: item1, item2, item3, item4 (order may vary)
// Mix multiple channels
ch_versions = channel.empty()
ch_versions = ch_versions.mix(PROCESS1.out.versions)
ch_versions = ch_versions.mix(PROCESS2.out.versions)
ch_versions = ch_versions.mix(PROCESS3.out.versions)
// Mix with empty channel (safe)
ch_results = ch_results.mix(channel.empty())
Branching and Splitting Operators
Split channels into multiple channels based on conditions.
12. branch()
Branches a channel into multiple channels based on conditions.
Examples:
// Branch by value
ch_samples = channel.of('sample1', 'sample2', 'sample3')
ch_branched = ch_samples.branch {
s1: it == 'sample1'
return it
s2: it == 'sample2'
return it
default: true
return it
}
// Creates: ch_branched.s1, ch_branched.s2, ch_branched.default
// Branch by metadata
ch_data = channel.of(
[[id: 's1', type: 'riboseq'], 'file1.fq'],
[[id: 's2', type: 'rnaseq'], 'file2.fq'],
[[id: 's3', type: 'riboseq'], 'file3.fq']
)
ch_by_type = ch_data.branch { meta, file ->
riboseq: meta.type == 'riboseq'
return [meta, file]
rnaseq: meta.type == 'rnaseq'
return [meta, file]
}
// Creates: ch_by_type.riboseq, ch_by_type.rnaseq
// Branch by file type
ch_files = channel.fromPath('/data/*')
ch_by_ext = ch_files.branch { file ->
fastq: file.name.endsWith('.fastq.gz')
return file
bam: file.name.endsWith('.bam')
return file
default: true
return file
}
13. multiMap()
Maps one channel to multiple output channels.
Examples:
// Split into multiple channels
ch_data = channel.of(
['sample1', 'data1', 'meta1'],
['sample2', 'data2', 'meta2']
)
ch_split = ch_data.multiMap { sample, data, meta ->
samples: sample
data: data
metadata: meta
}
// Creates: ch_split.samples, ch_split.data, ch_split.metadata
// Complex splitting
ch_bams = channel.of(
[[id: 's1'], '/path/to/s1.bam', '/path/to/s1.bai'],
[[id: 's2'], '/path/to/s2.bam', '/path/to/s2.bai']
)
ch_split = ch_bams.multiMap { meta, bam, bai ->
bam: [meta, bam, bai]
bam_only: [meta, bam]
bai_only: [meta, bai]
}
Structure Manipulation Operators
Modify channel structure or element order.
14. transpose()
Transposes channel structure.
Examples:
// Transpose tuples
ch_data = channel.of(
['sample1', 'file1.fq', 'file2.fq'],
['sample2', 'file3.fq', 'file4.fq']
)
ch_transposed = ch_data.transpose()
// Emits: ['sample1', 'sample2'], ['file1.fq', 'file3.fq'], ['file2.fq', 'file4.fq']
// Transpose with metadata
ch_files = channel.of(
[[id: 's1'], ['file1.fq', 'file2.fq']],
[[id: 's2'], ['file3.fq', 'file4.fq']]
)
ch_transposed = ch_files.transpose()
15. unique()
Removes duplicate elements.
Examples:
// Remove duplicates
ch_duplicates = channel.of('a', 'b', 'a', 'c', 'b', 'd')
ch_unique = ch_duplicates.unique()
// Emits: 'a', 'b', 'c', 'd'
// Unique by key
ch_data = channel.of(
['sample1', 'data1'],
['sample1', 'data2'],
['sample2', 'data3']
)
ch_unique_samples = ch_data.unique(by: 0)
// Emits: ['sample1', 'data1'], ['sample2', 'data3'] (first occurrence kept)
Selection Operators
Select specific elements from channels or collect all elements.
16. first()
Takes the first element from a channel.
Examples:
// Get first element
ch_data = channel.of('item1', 'item2', 'item3')
ch_first = ch_data.first()
// Emits: 'item1'
// First with condition
ch_data = channel.of(
['sample1', 'type1'],
['sample2', 'type2'],
['sample3', 'type1']
)
ch_first_type1 = ch_data.filter { it[1] == 'type1' }.first()
// Emits: ['sample1', 'type1']
17. last()
Takes the last element from a channel.
Examples:
// Get last element
ch_data = channel.of('item1', 'item2', 'item3')
ch_last = ch_data.last()
// Emits: 'item3'
18. take()
Takes the first N elements.
Examples:
// Take first 5 elements
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_first5 = ch_data.take(5)
// Emits: 1, 2, 3, 4, 5
Collection and Aggregation Operators
Collect or aggregate channel elements.
20. collect()
Collects all elements from a channel into a single list.
Examples:
// Collect all elements
ch_data = channel.of(1, 2, 3, 4, 5)
ch_collected = ch_data.collect()
// Emits: [1, 2, 3, 4, 5] (single list)
// Collect with grouping
ch_samples = channel.of(
['sample1', 'file1.fq'],
['sample1', 'file2.fq'],
['sample2', 'file3.fq']
)
ch_grouped = ch_samples.groupTuple().collect()
// Emits: [['sample1', ['file1.fq', 'file2.fq']], ['sample2', ['file3.fq']]]
// Collect for MultiQC
ch_multiqc_files = channel.empty()
ch_multiqc_files = ch_multiqc_files.mix(PROCESS1.out.html)
ch_multiqc_files = ch_multiqc_files.mix(PROCESS2.out.html)
MULTIQC(ch_multiqc_files.collect())
Use Cases:
- Collecting all channel elements for a single process execution
- Preparing inputs for tools that require all data at once (e.g., MultiQC)
- Converting channel to list for further processing
Topic Channels
Automatic channel collection using topics (Nextflow 24.04+).
21. topic()
Assigns a topic to channel elements for automatic collection (Nextflow 24.04+).
Examples:
// Emit to topic channel
process PROCESS1 {
output:
path "versions.yml", topic: "versions"
script:
"""
# ... process code ...
"""
}
process PROCESS2 {
output:
path "versions.yml", topic: "versions"
script:
"""
# ... process code ...
"""
}
// In workflow, versions are automatically collected
workflow {
PROCESS1()
PROCESS2()
// Versions channel automatically contains all versions.yml files
// from processes emitting to "versions" topic
ch_versions = versions // Automatically collected topic channel
}
// Multiple topics
process PROCESS {
output:
path "output1.txt", topic: "results"
path "output2.txt", topic: "logs"
script:
"""
# ... process code ...
"""
}
workflow {
PROCESS()
ch_results = results // All output1.txt files
ch_logs = logs // All output2.txt files
}
Use Cases:
- Automatic collection of version files from multiple processes
- Grouping related outputs without manual mixing
- Simplifying channel management in complex workflows
Text Processing Operators
Process text-based data formats.
22. view()
Prints channel elements to console (for debugging).
Examples:
// View channel contents
ch_data = channel.of('item1', 'item2', 'item3')
ch_data.view()
// Prints: item1, item2, item3
// View with custom message
ch_samples = channel.of('sample1', 'sample2')
ch_samples.view { "Processing: ${it}" }
// Prints: Processing: sample1, Processing: sample2
// View tuples
ch_data = channel.of(['sample1', 'file1.fq'], ['sample2', 'file2.fq'])
ch_data.view { meta, file -> "Sample: ${meta}, File: ${file}" }
// Prints: Sample: sample1, File: file1.fq, etc.
Use Cases:
- Debugging channel contents
- Monitoring workflow progress
- Inspecting data flow
23. splitCsv()
Splits CSV/TSV files into rows.
Examples:
// Split CSV file
ch_csv = channel.fromPath('samplesheet.csv', checkIfExists: true)
ch_rows = ch_csv.splitCsv()
// Emits each row as a list
// Split CSV with header
ch_csv = channel.fromPath('samplesheet.csv', checkIfExists: true)
ch_rows = ch_csv.splitCsv(header: true)
// Emits each row as a map with column names as keys
// Split TSV
ch_tsv = channel.fromPath('data.tsv', checkIfExists: true)
ch_rows = ch_tsv.splitCsv(header: true, sep: '\t')
// Split with custom separator
ch_data = channel.fromPath('data.txt', checkIfExists: true)
ch_rows = ch_data.splitCsv(sep: '|')
// Process CSV rows
ch_csv = channel.fromPath('samplesheet.csv', checkIfExists: true)
ch_samples = ch_csv.splitCsv(header: true, sep: ',')
.map { row ->
def meta = [id: row.sample, type: row.type]
def files = row.fastq_2 ? [row.fastq_1, row.fastq_2] : [row.fastq_1]
[meta, files]
}
Use Cases:
- Parsing samplesheet files
- Processing tabular data
- Converting CSV to channel format
24. splitText()
Splits text files into lines.
Examples:
// Split text file into lines
ch_text = channel.fromPath('data.txt', checkIfExists: true)
ch_lines = ch_text.splitText()
// Emits each line as a separate element
// Split with separator
ch_text = channel.fromPath('data.txt', checkIfExists: true)
ch_lines = ch_text.splitText(by: '\n\n') // Split by double newline
// Split with limit
ch_text = channel.fromPath('data.txt', checkIfExists: true)
ch_lines = ch_text.splitText(limit: 100) // First 100 lines
// Process lines
ch_ids = channel.fromPath('sample_ids.txt', checkIfExists: true)
ch_samples = ch_ids.splitText()
.map { it.trim() }
.filter { it != '' && !it.startsWith('#') }
Use Cases:
- Processing line-by-line data
- Parsing configuration files
- Reading lists of IDs or paths
Ordering Operators
Sort and reorder channel elements.
25. sort()
Sorts channel elements.
Examples:
// Sort by value
ch_data = channel.of(3, 1, 4, 1, 5, 9, 2, 6)
ch_sorted = ch_data.sort()
// Emits: 1, 1, 2, 3, 4, 5, 6, 9
// Sort by custom key
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_sorted = ch_files.sort { it.name }
// Sorted by file name
// Sort by modification time
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_sorted = ch_files.sort { it.lastModified() }
// Sort tuples
ch_data = channel.of(
['sample2', 'file2.fq'],
['sample1', 'file1.fq'],
['sample3', 'file3.fq']
)
ch_sorted = ch_data.sort { it[0] } // Sort by first element
// Emits: ['sample1', 'file1.fq'], ['sample2', 'file2.fq'], ['sample3', 'file3.fq']
Use Cases:
- Ordering files chronologically
- Sorting samples alphabetically
- Preparing ordered inputs
26. reverse()
Reverses the order of channel elements.
Examples:
// Reverse order
ch_data = channel.of(1, 2, 3, 4, 5)
ch_reversed = ch_data.reverse()
// Emits: 5, 4, 3, 2, 1
// Reverse after sorting
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_recent_first = ch_files.sort { it.lastModified() }.reverse()
// Most recent files first
Use Cases:
- Getting most recent files first
- Reversing processing order
- Last-in-first-out processing
Flow Control Operators
Control channel flow and buffering.
27. buffer()
Buffers channel elements into groups.
Examples:
// Buffer by size
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_buffered = ch_data.buffer(size: 3)
// Emits: [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
// Buffer by time
ch_data = channel.of(1, 2, 3, 4, 5)
ch_buffered = ch_data.buffer(time: 1000) // 1 second
// Buffer until condition
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_buffered = ch_data.buffer { it.sum() >= 10 }
// Buffers until sum >= 10
Use Cases:
- Batching operations
- Grouping elements for batch processing
- Time-based buffering
28. until()
Emits elements until a condition is met.
Examples:
// Until condition
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_until = ch_data.until { it > 5 }
// Emits: 1, 2, 3, 4, 5 (stops when value > 5)
// Until file exists
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_until = ch_files.until { it.size() > 0 }
Use Cases:
- Conditional processing
- Early termination
- Processing until condition met
29. repeat()
Repeats channel elements a specified number of times.
Examples:
// Repeat elements
ch_data = channel.of('sample1', 'sample2')
ch_repeated = ch_data.repeat(3)
// Emits: sample1, sample1, sample1, sample2, sample2, sample2
// Repeat for each treatment
ch_samples = channel.of('sample1', 'sample2')
ch_treatments = channel.of('control', 'treated')
ch_combined = ch_samples.repeat(ch_treatments.count())
.combine(ch_treatments)
// Creates all sample-treatment combinations
Use Cases:
- Repeating values for combination
- Creating multiple copies
- Expanding channel for cross-products
30. concat()
Concatenates multiple channels sequentially.
Examples:
// Concatenate channels
ch1 = channel.of('a', 'b', 'c')
ch2 = channel.of('d', 'e', 'f')
ch_concat = ch1.concat(ch2)
// Emits: a, b, c, d, e, f (in order)
// Concatenate multiple channels
ch1 = channel.of(1, 2)
ch2 = channel.of(3, 4)
ch3 = channel.of(5, 6)
ch_all = ch1.concat(ch2).concat(ch3)
// Emits: 1, 2, 3, 4, 5, 6
Use Cases:
- Sequential processing
- Maintaining order
- Combining channels in sequence
Utility Operators
Utility functions for channel operations.
31. count()
Counts the number of elements in a channel.
Examples:
// Count elements
ch_data = channel.of(1, 2, 3, 4, 5)
ch_count = ch_data.count()
// Emits: 5
// Count with condition
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_count = ch_files.filter { it.size() > 0 }.count()
// Counts non-empty files
Use Cases:
- Validating channel size
- Counting elements
- Checking data availability
Debugging Operators
Utilities for debugging and inspecting channels.
19. dump()
Prints channel contents (for debugging).
Examples:
// Debug channel contents
ch_data = channel.of('item1', 'item2', 'item3')
ch_data.dump()
// Prints channel contents to console
Common Patterns and Use Cases
Pattern 1: Creating Input Channels from Samplesheets
// From CSV samplesheet
channel
.fromPath(params.input, checkIfExists: true)
.splitCsv(header: true, sep: ',')
.map { row ->
def meta = [
id: row.sample,
type: row.type,
condition: row.condition
]
def files = row.fastq_2 ?
[file(row.fastq_1, checkIfExists: true), file(row.fastq_2, checkIfExists: true)] :
[file(row.fastq_1, checkIfExists: true)]
[meta, files]
}
.set { ch_fastq }
Pattern 2: Grouping Files by Sample
// Group multiple files per sample
channel
.fromFilePairs('/data/*_{1,2}.fastq.gz', checkIfExists: true)
.map { sample_id, files ->
[
[id: sample_id, single_end: false],
files
]
}
.set { ch_paired_fastq }
Pattern 3: Combining Reference Files
// Combine FASTA and GTF
ch_fasta = channel.value(file(params.fasta, checkIfExists: true))
ch_gtf = channel.value(file(params.gtf, checkIfExists: true))
ch_fasta_gtf = ch_fasta.combine(ch_gtf)
.map { fasta, gtf -> [[:], fasta, gtf] }
.first()
Pattern 4: Branching by Sample Type
// Branch samples by type
ch_samples
.branch { meta, files ->
riboseq: meta.type == 'riboseq'
return [meta, files]
rnaseq: meta.type == 'rnaseq'
return [meta, files]
tiseq: meta.type == 'tiseq'
return [meta, files]
}
.set {
ch_samples_by_type
}
Pattern 5: Joining BAMs with Indexes
// Join BAM files with their indexes
ch_bam = channel.of(
[[id: 's1'], '/path/to/s1.bam'],
[[id: 's2'], '/path/to/s2.bam']
)
ch_bai = channel.of(
[[id: 's1'], '/path/to/s1.bai'],
[[id: 's2'], '/path/to/s2.bai']
)
ch_bam_indexed = ch_bam.join(ch_bai, by: 0)
// Emits: [[id: 's1'], '/path/to/s1.bam', '/path/to/s1.bai'], ...
Pattern 6: Collecting Versions
// Initialize and collect versions
ch_versions = channel.empty()
ch_versions = ch_versions.mix(PROCESS1.out.versions)
ch_versions = ch_versions.mix(PROCESS2.out.versions)
ch_versions = ch_versions.mix(PROCESS3.out.versions)
ch_versions = ch_versions.filter { it != null }
Pattern 7: Transforming for Module Inputs
// Add metadata wrapper for modules
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_for_module = ch_files.map { file ->
[[id: file.baseName], file]
}
// Transform reference files
ch_fasta = channel.value('/path/to/genome.fasta')
ch_for_module = ch_fasta.map { fasta -> [[:], fasta] }
Pattern 8: Filtering and Validation
// Filter and validate
ch_files = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
ch_valid = ch_files
.filter { it.size() > 0 } // Non-empty files
.filter { it.name.contains('_R1') || it.name.contains('_R2') } // Paired-end pattern
.map { file ->
if (!file.exists()) {
log.error("File not found: ${file}")
return null
}
file
}
.filter { it != null }
Pattern 9: Conditional Channel Creation
// Create channels conditionally
ch_optional = params.use_optional ?
channel.fromPath('/data/optional.txt', checkIfExists: true) :
channel.empty()
// Multiple conditionals
ch_data = params.input_type == 'file' ?
channel.fromPath(params.input, checkIfExists: true) :
params.input_type == 'list' ?
channel.fromList(params.input.split(',')) :
channel.empty()
// Note: channel.from() is deprecated. Use channel.of() or channel.fromList() instead.
Pattern 10: Splitting and Recombining
// Split, process, recombine
ch_samples = channel.of('sample1', 'sample2', 'sample3')
ch_split = ch_samples.multiMap { sample ->
processed: process_sample(sample)
metadata: get_metadata(sample)
}
ch_recombined = ch_split.processed
.combine(ch_split.metadata)
.map { processed, meta -> [meta, processed] }
Best Practices
1. Always Use checkIfExists: true for File Paths
// Good
ch_files = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
// Avoid
ch_files = channel.fromPath('/data/*.fastq.gz') // No validation
2. Use Descriptive Channel Names
// Good
ch_fastq_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')
ch_genome_fasta = channel.value('/path/to/genome.fasta')
// Avoid
ch1 = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')
ch2 = channel.value('/path/to/genome.fasta')
3. Preserve Metadata Structure
// Good - consistent metadata structure
ch_samples = channel.of(
[[id: 's1', type: 'riboseq'], 'file1.fq'],
[[id: 's2', type: 'rnaseq'], 'file2.fq']
)
// Avoid - inconsistent structure
ch_samples = channel.of(
['s1', 'file1.fq'],
[[id: 's2'], 'file2.fq'] // Inconsistent
)
4. Initialize Collection Channels Early
// Good
ch_versions = channel.empty()
ch_multiqc_files = channel.empty()
// Use throughout workflow
ch_versions = ch_versions.mix(PROCESS1.out.versions)
ch_versions = ch_versions.mix(PROCESS2.out.versions)
5. Filter Null Values Before Collection
// Good
ch_versions = ch_versions.filter { it != null }
// Before final collection
softwareVersionsToYAML(ch_versions)
6. Use Appropriate Factory Methods
// Single value - use value()
ch_genome = channel.value('/path/to/genome.fasta')
// Multiple files - use fromPath()
ch_files = channel.fromPath('/data/*.fastq.gz')
// Paired files - use fromFilePairs()
ch_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')
// List data - use fromList()
ch_samples = channel.fromList(['s1', 's2', 's3'])
7. Handle Empty Channels Gracefully
// Check before using
if (!ch_data.isEmpty()) {
PROCESS(ch_data)
} else {
log.warn("No data available")
}
// Or use conditional
ch_optional = params.use_optional ?
channel.fromPath('/data/file.txt') :
channel.empty()
8. Document Channel Structure
// Document in comments
// Channel structure: [meta, [fastq_1, fastq_2]]
ch_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')
.map { id, files -> [[id: id], files] }
9. Use set for Final Channels
// Good - use set for final channels
channel
.fromPath(params.input)
.splitCsv()
.map { ... }
.set { ch_final }
// Avoid - intermediate channels without set
ch_intermediate = channel.fromPath(params.input)
ch_final = ch_intermediate.splitCsv()
10. Test Channel Creation
// Debug channel contents
ch_data.dump()
// Check channel size
log.info("Channel contains ${ch_data.count()} items")
// Validate structure
ch_data.view { meta, file ->
assert meta instanceof Map
assert file instanceof Path
}
Summary Reference Table
| Factory Method | Use Case | Output |
|---|---|---|
channel.of() |
Simple values, small lists | Emits each value |
channel.value() |
Single value, broadcasting | Emits one value |
channel.empty() |
Initialize, placeholders | Empty channel |
channel.fromPath() |
File paths, glob patterns | Path objects |
channel.fromFilePairs() |
Paired files (FASTQ) | [id, [file1, file2]] |
channel.fromList() |
Groovy lists | Emits list elements |
Note: channel.from() is deprecated (as of Nextflow 19.09.0-edge). Use channel.of() or channel.fromList() instead.
Channel Operators by Category:
Transformation Operators:
map()- Transform elementsfilter()- Filter elementsflatMap()- Flatten nested structures
Grouping and Combining Operators:
groupTuple()- Group by keyjoin()- Join by matching keyscombine()- Cartesian productmix()- Combine channels
Branching and Splitting Operators:
branch()- Split into branchesmultiMap()- Multiple outputs
Structure Manipulation Operators:
transpose()- Transpose structureunique()- Remove duplicates
Selection Operators:
first()- First elementlast()- Last elementtake()- First N elements
Collection and Aggregation Operators:
collect()- Collect all elements into a list
Topic Channels:
topic:- Assign topic for automatic collection (Nextflow 24.04+)
Text Processing Operators:
view()- Print channel elementssplitCsv()- Split CSV/TSV filessplitText()- Split text into lines
Ordering Operators:
sort()- Sort elementsreverse()- Reverse order
Flow Control Operators:
buffer()- Buffer elementsuntil()- Emit until conditionrepeat()- Repeat elementsconcat()- Concatenate channels
Utility Operators:
count()- Count elements
Debugging Operators:
dump()- Print channel contents
References
- Nextflow Channel Documentation
- Nextflow Operators Documentation
- Nextflow DSL2 Documentation
- Current pipeline examples:
workflows/riboseq/main.nf,subworkflows/**/main.nf
Comments