This document provides a comprehensive reference guide to Nextflow channel factories with detailed examples and use cases.


Introduction to Channel Factories

Channel factories are methods that create Nextflow channels from various data sources. Channels are the primary mechanism for passing data between processes in Nextflow.

Channel Characteristics

  • Asynchronous: Data flows asynchronously through channels
  • Immutable: Once created, channel contents cannot be modified
  • Lazy: Channels are evaluated only when consumed
  • Type-safe: Channels maintain data types

Basic Channel Factories

1. channel.of()

Creates a channel from a list of values.

Syntax:

channel.of(value1, value2, value3, ...)

Examples:

// Simple values
ch_numbers = channel.of(1, 2, 3, 4, 5)

// Strings
ch_samples = channel.of('sample1', 'sample2', 'sample3')

// Mixed types
ch_mixed = channel.of('sample1', 100, true, '/path/to/file.txt')

// Tuples
ch_tuples = channel.of(['sample1', '/path/to/file1.fq'], ['sample2', '/path/to/file2.fq'])

// Empty channel
ch_empty = channel.of()

Use Cases:

  • Creating test data
  • Hardcoded reference files
  • Configuration values
  • Small, known datasets

2. channel.value()

Creates a channel emitting a single value.

Syntax:

channel.value(value)

Examples:

// Single value
ch_genome = channel.value('/path/to/genome.fasta')

// Tuple value
ch_config = channel.value([id: 'config', path: '/path/to/config.txt'])

// File value
ch_reference = channel.value(file('/path/to/reference.fasta', checkIfExists: true))

// Used for broadcasting to multiple processes
ch_genome = channel.value('/path/to/genome.fasta')
PROCESS1(ch_input, ch_genome)
PROCESS2(ch_input, ch_genome)  // Same genome used in both

Use Cases:

  • Reference files used by multiple processes
  • Configuration values
  • Single metadata values
  • Broadcasting values to multiple processes

3. channel.empty()

Creates an empty channel.

Syntax:

channel.empty()

Examples:

// Initialize empty channel for collection
ch_multiqc_files = channel.empty()
ch_versions = channel.empty()

// Conditional channel creation
ch_optional = params.use_optional ? channel.of('value') : channel.empty()

// Mix with empty channel
ch_results = ch_results.mix(channel.empty())  // No effect, but safe

// Use in conditional logic
if (ch_data.isEmpty()) {
    log.warn("No data available")
}

Use Cases:

  • Initializing collection channels
  • Conditional channel creation
  • Placeholder for optional inputs
  • Testing empty channel handling

File-Based Channel Factories

4. channel.fromPath()

Creates a channel from file paths.

Syntax:

channel.fromPath(path, options)

Options:

  • checkIfExists: true - Verify file exists
  • glob: true - Enable glob patterns
  • type: 'file' or 'dir' - Specify type
  • maxDepth: 1 - Maximum directory depth for glob
  • followLinks: true - Follow symbolic links

Basic Examples:

// Single file
ch_fasta = channel.fromPath('/path/to/genome.fasta', checkIfExists: true)

// Multiple files
ch_files = channel.fromPath(['/path/to/file1.txt', '/path/to/file2.txt'])

// Glob pattern
ch_fastq = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)

// Recursive glob
ch_all_fastq = channel.fromPath('/data/**/*.fastq.gz', checkIfExists: true)

// Specific file type
ch_bam_files = channel.fromPath('/data/*.bam', type: 'file', checkIfExists: true)

// Directory
ch_dirs = channel.fromPath('/data/samples/*', type: 'dir', checkIfExists: true)

// With maxDepth
ch_shallow = channel.fromPath('/data/**/*.txt', maxDepth: 2, checkIfExists: true)

// From parameter
ch_input = channel.fromPath(params.input, checkIfExists: true)

Advanced Examples:

// Multiple patterns
ch_mixed = channel.fromPath([
    '/data/*.fastq.gz',
    '/data/*.bam',
    '/data/*.vcf'
], checkIfExists: true)

// Exclude patterns (using filter)
ch_fastq = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
    .filter { !it.name.contains('unmapped') }

// Sorted by modification time
ch_recent = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
    .sort { it.lastModified() }
    .reverse()  // Most recent first

// Creating channels from CSV/TSV files
ch_samplesheet = channel.fromPath('samplesheet.csv', checkIfExists: true)
    .splitCsv(header: true, sep: ',')
    .map { row -> [row.sample, row.fastq_1, row.fastq_2] }

// With validation
ch_samplesheet = channel.fromPath('samplesheet.csv', checkIfExists: true)
    .splitCsv(header: true, sep: ',')
    .map { row ->
        def meta = [id: row.sample, type: row.type]
        def files = row.fastq_2 ? [row.fastq_1, row.fastq_2] : [row.fastq_1]
        [meta, files]
    }

// TSV file
ch_data = channel.fromPath('data.tsv', checkIfExists: true)
    .splitCsv(header: true, sep: '\t')
    .map { row -> [row.id, row.value] }

// Creating channels from JSON files
import groovy.json.JsonSlurper

ch_json = channel.fromPath('config.json', checkIfExists: true)
    .map { file ->
        def json = new JsonSlurper().parse(file)
        [json.sample_id, json.fastq_path]
    }

Use Cases:

  • Reading input files
  • Finding files matching patterns
  • Loading reference data
  • Processing directory contents
  • Parsing structured data files (CSV, TSV, JSON)

5. channel.fromFilePairs()

Creates a channel from file pairs (e.g., paired-end FASTQ files).

Syntax:

channel.fromFilePairs(pattern, options)

Options:

  • checkIfExists: true - Verify files exist
  • size: -1 - Number of files per group (-1 = all)
  • flat: true - Flatten tuple structure

Examples:

// Standard paired-end pattern (_1/_2 suffix)
ch_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz', checkIfExists: true)
// Emits: [sample_id, [file1, file2]]

// Custom pattern (R1/R2)
ch_paired = channel.fromFilePairs('/data/*_{R1,R2}.fastq.gz', checkIfExists: true)

// Multiple file pairs
ch_triplets = channel.fromFilePairs('/data/*_{1,2,3}.fastq.gz', size: 3, checkIfExists: true)
// Emits: [sample_id, [file1, file2, file3]]

// Flat structure
ch_flat = channel.fromFilePairs('/data/*_{1,2}.fastq.gz', flat: true, checkIfExists: true)
// Emits: [sample_id, file1, file2] (no nested list)

// With metadata
ch_paired.map { id, files -> [id: id, single_end: false, files: files] }

Use Cases:

  • Paired-end sequencing data
  • Multiple related files per sample
  • Grouping files by sample ID
  • Processing file pairs together

6. channel.fromSRA()

Creates a channel from SRA run IDs (requires nf-core/fetchngs or similar).

Syntax:

channel.fromSRA(run_ids, options)

Examples:

// Single SRA ID
ch_sra = channel.fromSRA('SRR123456')

// Multiple SRA IDs
ch_sra = channel.fromSRA(['SRR123456', 'SRR123457', 'SRR123458'])

// From file
ch_sra_ids = channel.fromPath('sra_ids.txt', checkIfExists: true)
    .splitText()
    .map { it.trim() }
ch_sra = channel.fromSRA(ch_sra_ids)

Use Cases:

  • Downloading data from SRA
  • Processing public sequencing data
  • Batch SRA downloads

Data Structure Channel Factories

7. channel.fromList()

Creates a channel from a Groovy list.

Syntax:

channel.fromList(list)

Examples:

// Simple list
ch_list = channel.fromList([1, 2, 3, 4, 5])

// List of tuples
ch_samples = channel.fromList([
    ['sample1', '/path/to/file1.fq'],
    ['sample2', '/path/to/file2.fq'],
    ['sample3', '/path/to/file3.fq']
])

// List of maps
ch_metadata = channel.fromList([
    [id: 'sample1', type: 'riboseq', condition: 'control'],
    [id: 'sample2', type: 'riboseq', condition: 'treated'],
    [id: 'sample3', type: 'rnaseq', condition: 'control']
])

// From variable
def samples = ['sample1', 'sample2', 'sample3']
ch_samples = channel.fromList(samples)

// Nested lists
ch_nested = channel.fromList([
    ['sample1', ['file1_1.fq', 'file1_2.fq']],
    ['sample2', ['file2_1.fq', 'file2_2.fq']]
])

Use Cases:

  • Converting lists to channels
  • Hardcoded sample lists
  • Programmatically generated data
  • Small datasets

Channel Operators

Channel operators are methods that transform, filter, combine, or manipulate channels. They are grouped by functionality below.

Transformation Operators

Transform elements or channel structure.

5. map()

Transforms each element in a channel.

Examples:

// Simple transformation
ch_numbers = channel.of(1, 2, 3, 4, 5)
ch_doubled = ch_numbers.map { it * 2 }
// Emits: 2, 4, 6, 8, 10

// Extract file name
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_names = ch_files.map { it.name }
// Emits: file1.fastq.gz, file2.fastq.gz, ...

// Add metadata
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_with_meta = ch_files.map { file ->
    [id: file.baseName, file: file]
}

// Complex transformation
ch_samples = channel.fromList(['sample1', 'sample2'])
ch_processed = ch_samples.map { sample ->
    [
        id: sample,
        fastq_1: "/data/${sample}_R1.fastq.gz",
        fastq_2: "/data/${sample}_R2.fastq.gz",
        type: 'riboseq'
    ]
}

6. filter()

Filters elements based on a condition.

Examples:

// Filter by value
ch_numbers = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_even = ch_numbers.filter { it % 2 == 0 }
// Emits: 2, 4, 6, 8, 10

// Filter files by extension
ch_files = channel.fromPath('/data/*')
ch_fastq = ch_files.filter { it.name.endsWith('.fastq.gz') }

// Filter by metadata
ch_samples = channel.fromList([
    [id: 's1', type: 'riboseq'],
    [id: 's2', type: 'rnaseq'],
    [id: 's3', type: 'riboseq']
])
ch_riboseq = ch_samples.filter { it.type == 'riboseq' }
// Emits: [id: 's1', type: 'riboseq'], [id: 's3', type: 'riboseq']

// Filter tuples
ch_data = channel.of(
    ['sample1', 'riboseq', '/path/to/file1.fq'],
    ['sample2', 'rnaseq', '/path/to/file2.fq'],
    ['sample3', 'riboseq', '/path/to/file3.fq']
)
ch_riboseq = ch_data.filter { meta, file -> meta[1] == 'riboseq' }

7. flatMap()

Maps each element to multiple elements and flattens the result.

Examples:

// Expand list elements
ch_lists = channel.of([1, 2], [3, 4], [5, 6])
ch_flat = ch_lists.flatMap { it }
// Emits: 1, 2, 3, 4, 5, 6

// Generate multiple outputs per input
ch_samples = channel.of('sample1', 'sample2')
ch_files = ch_samples.flatMap { sample ->
    [
        "${sample}_R1.fastq.gz",
        "${sample}_R2.fastq.gz",
        "${sample}_R3.fastq.gz"
    ]
}
// Emits: sample1_R1.fastq.gz, sample1_R2.fastq.gz, sample1_R3.fastq.gz, sample2_R1.fastq.gz, sample2_R2.fastq.gz, sample2_R3.fastq.gz


// Process directory contents
ch_dirs = channel.fromPath('/data/samples/*', type: 'dir')
ch_files = ch_dirs.flatMap { dir ->
    dir.listFiles().findAll { it.name.endsWith('.fastq.gz') }
}

Grouping and Combining Operators

Group elements or combine multiple channels.

8. groupTuple()

Groups elements by a key (first element by default).

Examples:

// Group by first element
ch_data = channel.of(
    ['sample1', 'file1.fq'],
    ['sample1', 'file2.fq'],
    ['sample2', 'file3.fq'],
    ['sample2', 'file4.fq']
)
ch_grouped = ch_data.groupTuple()
// Emits: ['sample1', ['file1.fq', 'file2.fq']], ['sample2', ['file3.fq', 'file4.fq']]

// Group by specific index
ch_data = channel.of(
    ['sample1', 'lane1', 'file1.fq'],
    ['sample1', 'lane2', 'file2.fq'],
    ['sample2', 'lane1', 'file3.fq']
)
ch_by_sample = ch_data.groupTuple(by: 0)
// Groups by sample (first element)

// Group by metadata key
ch_data = channel.of(
    [[id: 's1', type: 'riboseq'], 'file1.fq'],
    [[id: 's1', type: 'riboseq'], 'file2.fq'],
    [[id: 's2', type: 'rnaseq'], 'file3.fq']
)
ch_grouped = ch_data.groupTuple(by: 0)
// Groups by metadata map

9. join()

Joins two channels by matching keys.

Examples:

// Join by first element (default)
ch_samples = channel.of(['sample1', 'data1'], ['sample2', 'data2'])
ch_metadata = channel.of(['sample1', 'meta1'], ['sample2', 'meta2'])
ch_joined = ch_samples.join(ch_metadata)
// Emits: ['sample1', 'data1', 'meta1'], ['sample2', 'data2', 'meta2']

// Join by specific index
ch_samples = channel.of(['sample1', 'data1'], ['sample2', 'data2'])
ch_metadata = channel.of(['sample1', 'type1', 'meta1'])
ch_joined = ch_samples.join(ch_metadata, by: 0)
// Joins on first element

// Join with multiple keys
ch_data1 = channel.of(['sample1', 'lane1', 'data1'])
ch_data2 = channel.of(['sample1', 'lane1', 'data2'])
ch_joined = ch_data1.join(ch_data2, by: [0, 1])
// Joins on sample and lane

// Join with metadata
ch_bam = channel.of(
    [[id: 's1'], '/path/to/s1.bam'],
    [[id: 's2'], '/path/to/s2.bam']
)
ch_index = channel.of(
    [[id: 's1'], '/path/to/s1.bai'],
    [[id: 's2'], '/path/to/s2.bai']
)
ch_joined = ch_bam.join(ch_index, by: 0)
// Emits: [[id: 's1'], '/path/to/s1.bam', '/path/to/s1.bai'], ...

10. combine()

Combines channels creating a Cartesian product.

Examples:

// Combine two channels
ch_samples = channel.of('sample1', 'sample2')
ch_treatments = channel.of('control', 'treated')
ch_combined = ch_samples.combine(ch_treatments)
// Emits: ['sample1', 'control'], ['sample1', 'treated'], ['sample2', 'control'], ['sample2', 'treated']

// Combine multiple channels
ch_samples = channel.of('s1', 's2')
ch_treatments = channel.of('ctrl', 'trt')
ch_replicates = channel.of('rep1', 'rep2')
ch_all = ch_samples.combine(ch_treatments).combine(ch_replicates)
// Creates all combinations

// Combine with metadata
ch_fasta = channel.of(['ref1', '/path/to/ref1.fa'])
ch_gtf = channel.of(['ref1', '/path/to/ref1.gtf'])
ch_combined = ch_fasta.combine(ch_gtf)
// Emits: [['ref1', '/path/to/ref1.fa'], ['ref1', '/path/to/ref1.gtf']]

11. mix()

Mixes multiple channels into one.

Examples:

// Mix two channels
ch_channel1 = channel.of('item1', 'item2')
ch_channel2 = channel.of('item3', 'item4')
ch_mixed = ch_channel1.mix(ch_channel2)
// Emits: item1, item2, item3, item4 (order may vary)

// Mix multiple channels
ch_versions = channel.empty()
ch_versions = ch_versions.mix(PROCESS1.out.versions)
ch_versions = ch_versions.mix(PROCESS2.out.versions)
ch_versions = ch_versions.mix(PROCESS3.out.versions)

// Mix with empty channel (safe)
ch_results = ch_results.mix(channel.empty())

Branching and Splitting Operators

Split channels into multiple channels based on conditions.

12. branch()

Branches a channel into multiple channels based on conditions.

Examples:

// Branch by value
ch_samples = channel.of('sample1', 'sample2', 'sample3')
ch_branched = ch_samples.branch {
    s1: it == 'sample1'
        return it
    s2: it == 'sample2'
        return it
    default: true
        return it
}
// Creates: ch_branched.s1, ch_branched.s2, ch_branched.default

// Branch by metadata
ch_data = channel.of(
    [[id: 's1', type: 'riboseq'], 'file1.fq'],
    [[id: 's2', type: 'rnaseq'], 'file2.fq'],
    [[id: 's3', type: 'riboseq'], 'file3.fq']
)
ch_by_type = ch_data.branch { meta, file ->
    riboseq: meta.type == 'riboseq'
        return [meta, file]
    rnaseq: meta.type == 'rnaseq'
        return [meta, file]
}
// Creates: ch_by_type.riboseq, ch_by_type.rnaseq

// Branch by file type
ch_files = channel.fromPath('/data/*')
ch_by_ext = ch_files.branch { file ->
    fastq: file.name.endsWith('.fastq.gz')
        return file
    bam: file.name.endsWith('.bam')
        return file
    default: true
        return file
}

13. multiMap()

Maps one channel to multiple output channels.

Examples:

// Split into multiple channels
ch_data = channel.of(
    ['sample1', 'data1', 'meta1'],
    ['sample2', 'data2', 'meta2']
)
ch_split = ch_data.multiMap { sample, data, meta ->
    samples: sample
    data: data
    metadata: meta
}
// Creates: ch_split.samples, ch_split.data, ch_split.metadata

// Complex splitting
ch_bams = channel.of(
    [[id: 's1'], '/path/to/s1.bam', '/path/to/s1.bai'],
    [[id: 's2'], '/path/to/s2.bam', '/path/to/s2.bai']
)
ch_split = ch_bams.multiMap { meta, bam, bai ->
    bam: [meta, bam, bai]
    bam_only: [meta, bam]
    bai_only: [meta, bai]
}

Structure Manipulation Operators

Modify channel structure or element order.

14. transpose()

Transposes channel structure.

Examples:

// Transpose tuples
ch_data = channel.of(
    ['sample1', 'file1.fq', 'file2.fq'],
    ['sample2', 'file3.fq', 'file4.fq']
)
ch_transposed = ch_data.transpose()
// Emits: ['sample1', 'sample2'], ['file1.fq', 'file3.fq'], ['file2.fq', 'file4.fq']

// Transpose with metadata
ch_files = channel.of(
    [[id: 's1'], ['file1.fq', 'file2.fq']],
    [[id: 's2'], ['file3.fq', 'file4.fq']]
)
ch_transposed = ch_files.transpose()

15. unique()

Removes duplicate elements.

Examples:

// Remove duplicates
ch_duplicates = channel.of('a', 'b', 'a', 'c', 'b', 'd')
ch_unique = ch_duplicates.unique()
// Emits: 'a', 'b', 'c', 'd'

// Unique by key
ch_data = channel.of(
    ['sample1', 'data1'],
    ['sample1', 'data2'],
    ['sample2', 'data3']
)
ch_unique_samples = ch_data.unique(by: 0)
// Emits: ['sample1', 'data1'], ['sample2', 'data3'] (first occurrence kept)

Selection Operators

Select specific elements from channels or collect all elements.

16. first()

Takes the first element from a channel.

Examples:

// Get first element
ch_data = channel.of('item1', 'item2', 'item3')
ch_first = ch_data.first()
// Emits: 'item1'

// First with condition
ch_data = channel.of(
    ['sample1', 'type1'],
    ['sample2', 'type2'],
    ['sample3', 'type1']
)
ch_first_type1 = ch_data.filter { it[1] == 'type1' }.first()
// Emits: ['sample1', 'type1']

17. last()

Takes the last element from a channel.

Examples:

// Get last element
ch_data = channel.of('item1', 'item2', 'item3')
ch_last = ch_data.last()
// Emits: 'item3'

18. take()

Takes the first N elements.

Examples:

// Take first 5 elements
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_first5 = ch_data.take(5)
// Emits: 1, 2, 3, 4, 5

Collection and Aggregation Operators

Collect or aggregate channel elements.

20. collect()

Collects all elements from a channel into a single list.

Examples:

// Collect all elements
ch_data = channel.of(1, 2, 3, 4, 5)
ch_collected = ch_data.collect()
// Emits: [1, 2, 3, 4, 5] (single list)

// Collect with grouping
ch_samples = channel.of(
    ['sample1', 'file1.fq'],
    ['sample1', 'file2.fq'],
    ['sample2', 'file3.fq']
)

ch_grouped = ch_samples.groupTuple().collect()
// Emits: [['sample1', ['file1.fq', 'file2.fq']], ['sample2', ['file3.fq']]]

// Collect for MultiQC
ch_multiqc_files = channel.empty()
ch_multiqc_files = ch_multiqc_files.mix(PROCESS1.out.html)
ch_multiqc_files = ch_multiqc_files.mix(PROCESS2.out.html)
MULTIQC(ch_multiqc_files.collect())

Use Cases:

  • Collecting all channel elements for a single process execution
  • Preparing inputs for tools that require all data at once (e.g., MultiQC)
  • Converting channel to list for further processing

Topic Channels

Automatic channel collection using topics (Nextflow 24.04+).

21. topic()

Assigns a topic to channel elements for automatic collection (Nextflow 24.04+).

Examples:

// Emit to topic channel
process PROCESS1 {
    output:
    path "versions.yml", topic: "versions"
    
    script:
    """
    # ... process code ...
    """
}

process PROCESS2 {
    output:
    path "versions.yml", topic: "versions"
    
    script:
    """
    # ... process code ...
    """
}

// In workflow, versions are automatically collected
workflow {
    PROCESS1()
    PROCESS2()
    
    // Versions channel automatically contains all versions.yml files
    // from processes emitting to "versions" topic
    ch_versions = versions  // Automatically collected topic channel
}

// Multiple topics
process PROCESS {
    output:
    path "output1.txt", topic: "results"
    path "output2.txt", topic: "logs"

    script:
    """
    # ... process code ...
    """
}

workflow {
    PROCESS()
    ch_results = results  // All output1.txt files
    ch_logs = logs        // All output2.txt files
}

Use Cases:

  • Automatic collection of version files from multiple processes
  • Grouping related outputs without manual mixing
  • Simplifying channel management in complex workflows

Text Processing Operators

Process text-based data formats.

22. view()

Prints channel elements to console (for debugging).

Examples:

// View channel contents
ch_data = channel.of('item1', 'item2', 'item3')
ch_data.view()
// Prints: item1, item2, item3

// View with custom message
ch_samples = channel.of('sample1', 'sample2')
ch_samples.view { "Processing: ${it}" }
// Prints: Processing: sample1, Processing: sample2

// View tuples
ch_data = channel.of(['sample1', 'file1.fq'], ['sample2', 'file2.fq'])
ch_data.view { meta, file -> "Sample: ${meta}, File: ${file}" }
// Prints: Sample: sample1, File: file1.fq, etc.

Use Cases:

  • Debugging channel contents
  • Monitoring workflow progress
  • Inspecting data flow

23. splitCsv()

Splits CSV/TSV files into rows.

Examples:

// Split CSV file
ch_csv = channel.fromPath('samplesheet.csv', checkIfExists: true)
ch_rows = ch_csv.splitCsv()
// Emits each row as a list

// Split CSV with header
ch_csv = channel.fromPath('samplesheet.csv', checkIfExists: true)
ch_rows = ch_csv.splitCsv(header: true)
// Emits each row as a map with column names as keys

// Split TSV
ch_tsv = channel.fromPath('data.tsv', checkIfExists: true)
ch_rows = ch_tsv.splitCsv(header: true, sep: '\t')

// Split with custom separator
ch_data = channel.fromPath('data.txt', checkIfExists: true)
ch_rows = ch_data.splitCsv(sep: '|')

// Process CSV rows
ch_csv = channel.fromPath('samplesheet.csv', checkIfExists: true)
ch_samples = ch_csv.splitCsv(header: true, sep: ',')
    .map { row ->
        def meta = [id: row.sample, type: row.type]
        def files = row.fastq_2 ? [row.fastq_1, row.fastq_2] : [row.fastq_1]
        [meta, files]
    }

Use Cases:

  • Parsing samplesheet files
  • Processing tabular data
  • Converting CSV to channel format

24. splitText()

Splits text files into lines.

Examples:

// Split text file into lines
ch_text = channel.fromPath('data.txt', checkIfExists: true)
ch_lines = ch_text.splitText()
// Emits each line as a separate element

// Split with separator
ch_text = channel.fromPath('data.txt', checkIfExists: true)
ch_lines = ch_text.splitText(by: '\n\n')  // Split by double newline

// Split with limit
ch_text = channel.fromPath('data.txt', checkIfExists: true)
ch_lines = ch_text.splitText(limit: 100)  // First 100 lines

// Process lines
ch_ids = channel.fromPath('sample_ids.txt', checkIfExists: true)
ch_samples = ch_ids.splitText()
    .map { it.trim() }
    .filter { it != '' && !it.startsWith('#') }

Use Cases:

  • Processing line-by-line data
  • Parsing configuration files
  • Reading lists of IDs or paths

Ordering Operators

Sort and reorder channel elements.

25. sort()

Sorts channel elements.

Examples:

// Sort by value
ch_data = channel.of(3, 1, 4, 1, 5, 9, 2, 6)
ch_sorted = ch_data.sort()
// Emits: 1, 1, 2, 3, 4, 5, 6, 9

// Sort by custom key
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_sorted = ch_files.sort { it.name }
// Sorted by file name

// Sort by modification time
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_sorted = ch_files.sort { it.lastModified() }

// Sort tuples
ch_data = channel.of(
    ['sample2', 'file2.fq'],
    ['sample1', 'file1.fq'],
    ['sample3', 'file3.fq']
)
ch_sorted = ch_data.sort { it[0] }  // Sort by first element
// Emits: ['sample1', 'file1.fq'], ['sample2', 'file2.fq'], ['sample3', 'file3.fq']

Use Cases:

  • Ordering files chronologically
  • Sorting samples alphabetically
  • Preparing ordered inputs

26. reverse()

Reverses the order of channel elements.

Examples:

// Reverse order
ch_data = channel.of(1, 2, 3, 4, 5)
ch_reversed = ch_data.reverse()
// Emits: 5, 4, 3, 2, 1

// Reverse after sorting
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_recent_first = ch_files.sort { it.lastModified() }.reverse()
// Most recent files first

Use Cases:

  • Getting most recent files first
  • Reversing processing order
  • Last-in-first-out processing

Flow Control Operators

Control channel flow and buffering.

27. buffer()

Buffers channel elements into groups.

Examples:

// Buffer by size
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_buffered = ch_data.buffer(size: 3)
// Emits: [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]

// Buffer by time
ch_data = channel.of(1, 2, 3, 4, 5)
ch_buffered = ch_data.buffer(time: 1000)  // 1 second

// Buffer until condition
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_buffered = ch_data.buffer { it.sum() >= 10 }
// Buffers until sum >= 10

Use Cases:

  • Batching operations
  • Grouping elements for batch processing
  • Time-based buffering

28. until()

Emits elements until a condition is met.

Examples:

// Until condition
ch_data = channel.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
ch_until = ch_data.until { it > 5 }
// Emits: 1, 2, 3, 4, 5 (stops when value > 5)

// Until file exists
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_until = ch_files.until { it.size() > 0 }

Use Cases:

  • Conditional processing
  • Early termination
  • Processing until condition met

29. repeat()

Repeats channel elements a specified number of times.

Examples:

// Repeat elements
ch_data = channel.of('sample1', 'sample2')
ch_repeated = ch_data.repeat(3)
// Emits: sample1, sample1, sample1, sample2, sample2, sample2

// Repeat for each treatment
ch_samples = channel.of('sample1', 'sample2')
ch_treatments = channel.of('control', 'treated')
ch_combined = ch_samples.repeat(ch_treatments.count())
    .combine(ch_treatments)
// Creates all sample-treatment combinations

Use Cases:

  • Repeating values for combination
  • Creating multiple copies
  • Expanding channel for cross-products

30. concat()

Concatenates multiple channels sequentially.

Examples:

// Concatenate channels
ch1 = channel.of('a', 'b', 'c')
ch2 = channel.of('d', 'e', 'f')
ch_concat = ch1.concat(ch2)
// Emits: a, b, c, d, e, f (in order)

// Concatenate multiple channels
ch1 = channel.of(1, 2)
ch2 = channel.of(3, 4)
ch3 = channel.of(5, 6)
ch_all = ch1.concat(ch2).concat(ch3)
// Emits: 1, 2, 3, 4, 5, 6

Use Cases:

  • Sequential processing
  • Maintaining order
  • Combining channels in sequence

Utility Operators

Utility functions for channel operations.

31. count()

Counts the number of elements in a channel.

Examples:

// Count elements
ch_data = channel.of(1, 2, 3, 4, 5)
ch_count = ch_data.count()
// Emits: 5

// Count with condition
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_count = ch_files.filter { it.size() > 0 }.count()
// Counts non-empty files

Use Cases:

  • Validating channel size
  • Counting elements
  • Checking data availability

Debugging Operators

Utilities for debugging and inspecting channels.

19. dump()

Prints channel contents (for debugging).

Examples:

// Debug channel contents
ch_data = channel.of('item1', 'item2', 'item3')
ch_data.dump()
// Prints channel contents to console

Common Patterns and Use Cases

Pattern 1: Creating Input Channels from Samplesheets

// From CSV samplesheet
channel
    .fromPath(params.input, checkIfExists: true)
    .splitCsv(header: true, sep: ',')
    .map { row ->
        def meta = [
            id: row.sample,
            type: row.type,
            condition: row.condition
        ]
        def files = row.fastq_2 ? 
            [file(row.fastq_1, checkIfExists: true), file(row.fastq_2, checkIfExists: true)] :
            [file(row.fastq_1, checkIfExists: true)]
        [meta, files]
    }
    .set { ch_fastq }

Pattern 2: Grouping Files by Sample

// Group multiple files per sample
channel
    .fromFilePairs('/data/*_{1,2}.fastq.gz', checkIfExists: true)
    .map { sample_id, files ->
        [
            [id: sample_id, single_end: false],
            files
        ]
    }
    .set { ch_paired_fastq }

Pattern 3: Combining Reference Files

// Combine FASTA and GTF
ch_fasta = channel.value(file(params.fasta, checkIfExists: true))
ch_gtf = channel.value(file(params.gtf, checkIfExists: true))
ch_fasta_gtf = ch_fasta.combine(ch_gtf)
    .map { fasta, gtf -> [[:], fasta, gtf] }
    .first()

Pattern 4: Branching by Sample Type

// Branch samples by type
ch_samples
    .branch { meta, files ->
        riboseq: meta.type == 'riboseq'
            return [meta, files]
        rnaseq: meta.type == 'rnaseq'
            return [meta, files]
        tiseq: meta.type == 'tiseq'
            return [meta, files]
    }
    .set {
        ch_samples_by_type
    }

Pattern 5: Joining BAMs with Indexes

// Join BAM files with their indexes
ch_bam = channel.of(
    [[id: 's1'], '/path/to/s1.bam'],
    [[id: 's2'], '/path/to/s2.bam']
)
ch_bai = channel.of(
    [[id: 's1'], '/path/to/s1.bai'],
    [[id: 's2'], '/path/to/s2.bai']
)
ch_bam_indexed = ch_bam.join(ch_bai, by: 0)
// Emits: [[id: 's1'], '/path/to/s1.bam', '/path/to/s1.bai'], ...

Pattern 6: Collecting Versions

// Initialize and collect versions
ch_versions = channel.empty()
ch_versions = ch_versions.mix(PROCESS1.out.versions)
ch_versions = ch_versions.mix(PROCESS2.out.versions)
ch_versions = ch_versions.mix(PROCESS3.out.versions)
ch_versions = ch_versions.filter { it != null }

Pattern 7: Transforming for Module Inputs

// Add metadata wrapper for modules
ch_files = channel.fromPath('/data/*.fastq.gz')
ch_for_module = ch_files.map { file ->
    [[id: file.baseName], file]
}

// Transform reference files
ch_fasta = channel.value('/path/to/genome.fasta')
ch_for_module = ch_fasta.map { fasta -> [[:], fasta] }

Pattern 8: Filtering and Validation

// Filter and validate
ch_files = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)
ch_valid = ch_files
    .filter { it.size() > 0 }  // Non-empty files
    .filter { it.name.contains('_R1') || it.name.contains('_R2') }  // Paired-end pattern
    .map { file ->
        if (!file.exists()) {
            log.error("File not found: ${file}")
            return null
        }
        file
    }
    .filter { it != null }

Pattern 9: Conditional Channel Creation

// Create channels conditionally
ch_optional = params.use_optional ? 
    channel.fromPath('/data/optional.txt', checkIfExists: true) : 
    channel.empty()

// Multiple conditionals
ch_data = params.input_type == 'file' ?
    channel.fromPath(params.input, checkIfExists: true) :
    params.input_type == 'list' ?
    channel.fromList(params.input.split(',')) :
    channel.empty()

// Note: channel.from() is deprecated. Use channel.of() or channel.fromList() instead.

Pattern 10: Splitting and Recombining

// Split, process, recombine
ch_samples = channel.of('sample1', 'sample2', 'sample3')
ch_split = ch_samples.multiMap { sample ->
    processed: process_sample(sample)
    metadata: get_metadata(sample)
}
ch_recombined = ch_split.processed
    .combine(ch_split.metadata)
    .map { processed, meta -> [meta, processed] }

Best Practices

1. Always Use checkIfExists: true for File Paths

// Good
ch_files = channel.fromPath('/data/*.fastq.gz', checkIfExists: true)

// Avoid
ch_files = channel.fromPath('/data/*.fastq.gz')  // No validation

2. Use Descriptive Channel Names

// Good
ch_fastq_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')
ch_genome_fasta = channel.value('/path/to/genome.fasta')

// Avoid
ch1 = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')
ch2 = channel.value('/path/to/genome.fasta')

3. Preserve Metadata Structure

// Good - consistent metadata structure
ch_samples = channel.of(
    [[id: 's1', type: 'riboseq'], 'file1.fq'],
    [[id: 's2', type: 'rnaseq'], 'file2.fq']
)

// Avoid - inconsistent structure
ch_samples = channel.of(
    ['s1', 'file1.fq'],
    [[id: 's2'], 'file2.fq']  // Inconsistent
)

4. Initialize Collection Channels Early

// Good
ch_versions = channel.empty()
ch_multiqc_files = channel.empty()

// Use throughout workflow
ch_versions = ch_versions.mix(PROCESS1.out.versions)
ch_versions = ch_versions.mix(PROCESS2.out.versions)

5. Filter Null Values Before Collection

// Good
ch_versions = ch_versions.filter { it != null }

// Before final collection
softwareVersionsToYAML(ch_versions)

6. Use Appropriate Factory Methods

// Single value - use value()
ch_genome = channel.value('/path/to/genome.fasta')

// Multiple files - use fromPath()
ch_files = channel.fromPath('/data/*.fastq.gz')

// Paired files - use fromFilePairs()
ch_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')

// List data - use fromList()
ch_samples = channel.fromList(['s1', 's2', 's3'])

7. Handle Empty Channels Gracefully

// Check before using
if (!ch_data.isEmpty()) {
    PROCESS(ch_data)
} else {
    log.warn("No data available")
}

// Or use conditional
ch_optional = params.use_optional ? 
    channel.fromPath('/data/file.txt') : 
    channel.empty()

8. Document Channel Structure

// Document in comments
// Channel structure: [meta, [fastq_1, fastq_2]]
ch_paired = channel.fromFilePairs('/data/*_{1,2}.fastq.gz')
    .map { id, files -> [[id: id], files] }

9. Use set for Final Channels

// Good - use set for final channels
channel
    .fromPath(params.input)
    .splitCsv()
    .map { ... }
    .set { ch_final }

// Avoid - intermediate channels without set
ch_intermediate = channel.fromPath(params.input)
ch_final = ch_intermediate.splitCsv()

10. Test Channel Creation

// Debug channel contents
ch_data.dump()

// Check channel size
log.info("Channel contains ${ch_data.count()} items")

// Validate structure
ch_data.view { meta, file -> 
    assert meta instanceof Map
    assert file instanceof Path
}

Summary Reference Table

Factory Method Use Case Output
channel.of() Simple values, small lists Emits each value
channel.value() Single value, broadcasting Emits one value
channel.empty() Initialize, placeholders Empty channel
channel.fromPath() File paths, glob patterns Path objects
channel.fromFilePairs() Paired files (FASTQ) [id, [file1, file2]]
channel.fromList() Groovy lists Emits list elements

Note: channel.from() is deprecated (as of Nextflow 19.09.0-edge). Use channel.of() or channel.fromList() instead.

Channel Operators by Category:

Transformation Operators:

  • map() - Transform elements
  • filter() - Filter elements
  • flatMap() - Flatten nested structures

Grouping and Combining Operators:

  • groupTuple() - Group by key
  • join() - Join by matching keys
  • combine() - Cartesian product
  • mix() - Combine channels

Branching and Splitting Operators:

  • branch() - Split into branches
  • multiMap() - Multiple outputs

Structure Manipulation Operators:

  • transpose() - Transpose structure
  • unique() - Remove duplicates

Selection Operators:

  • first() - First element
  • last() - Last element
  • take() - First N elements

Collection and Aggregation Operators:

  • collect() - Collect all elements into a list

Topic Channels:

  • topic: - Assign topic for automatic collection (Nextflow 24.04+)

Text Processing Operators:

  • view() - Print channel elements
  • splitCsv() - Split CSV/TSV files
  • splitText() - Split text into lines

Ordering Operators:

  • sort() - Sort elements
  • reverse() - Reverse order

Flow Control Operators:

  • buffer() - Buffer elements
  • until() - Emit until condition
  • repeat() - Repeat elements
  • concat() - Concatenate channels

Utility Operators:

  • count() - Count elements

Debugging Operators:

  • dump() - Print channel contents

References