Tag Archives: split-job

Split-Job 0.93

Wow, it has been a long time since I had time to post anything on this blog. However, I have been making some improvements to the Split-Job script over the last few months and the results are below. I use this script myself a lot and I hope this is useful for some of you as well.  Please see the previous version for more information about the concept and usage of Split-Job.

The changes are listed in the script comments. One aspect that deserves extra attention is the runspace environment. As before, the runspaces that process the pipeline objects do not necessarily have the same variables, functions etc. that your main PowerShell session contains. If you do need those, you now have two options:

  • Specify the -UseProfile switch. This will load your PS profile, which should give you (most of) your snapins, functions and aliases. This is most useful when using Split-Job interactively at the console.
  • Import the specific functions, variables etc. you need by using the SnapIn, Function, Alias and Variable switches. Each of these can take multiple arguments and wildcards. This is more efficient when using Split-Job inside a script. 

I am actually considering making -UseProfile the default and creating the opposite -NoProfile switch instead. This will make it even easier to use Split-Job at the command line. In my case this does not slow things down considerably. If you want, you can exclude portions of your profile (e.g. PowerTab) by testing whether $SplitJobRunSpace exists. Feedback is welcome!

Enjoy,

Arnoud


#requires -version 1.0
################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld - www.jansveld.net/powershell
## Version History
## 0.93   Improve error handling: errors originating in the Scriptblock now
##        have more meaningful output
##        Show additional info in the progress bar (thanks Stephen Mills)
##        Add SnapIn parameter: imports (registered) PowerShell snapins
##        Add Function parameter: imports functions
##        Add SplitJobRunSpace variable; allows scripts to test if they are 
##        running in a runspace 
##        Add seconds remaining to progress bar (experimental)
## 0.92   Add UseProfile switch: imports the PS profile
##        Add Variable parameter: imports variables
##        Add Alias parameter: imports aliases
##        Restart pipeline if it stops due to an error
##        Set the current path in each runspace to that of the calling process
## 0.91   Revert to v 0.8 input syntax for the script block
##        Add error handling for empty input queue
## 0.9    Add logic to distinguish between scriptblocks and cmdlets or scripts:
##        if a ScriptBlock is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
################################################################################

function Split-Job {
    param (
        $Scriptblock = $(throw 'You must specify a command or script block!'),
        [int]$MaxPipelines=10,
        [switch]$UseProfile,
        [string[]]$Variable,
        [string[]]$Function = @(),
        [string[]]$Alias = @(),
        [string[]]$SnapIn
    ) 

    function Init ($InputQueue){
        # Create the shared thread-safe queue and fill it with the input objects
        $Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($InputQueue))
        $QueueLength = $Queue.Count
        # Do not create more runspaces than input objects
        if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
        # Create the script to be run by each runspace
        $Script  = "Set-Location '$PWD'; "
        $Script += {
            $SplitJobQueue = $($Input)
            & {
                trap {continue}
                while ($SplitJobQueue.Count) {$SplitJobQueue.Dequeue()}
            } |
        }.ToString() + $Scriptblock

        # Create an array to keep track of the set of pipelines
        $Pipelines = New-Object System.Collections.ArrayList

        # Collect the functions and aliases to import
        $ImportItems = ($Function -replace '^','Function:') +
            ($Alias -replace '^','Alias:') |
            Get-Item | select PSPath, Definition
        $stopwatch = New-Object System.Diagnostics.Stopwatch
        $stopwatch.Start()
    }

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        $Runspace.SessionStateProxy.SetVariable('SplitJobRunSpace', $True)

        function CreatePipeline {
            param ($Data, $Scriptblock)
            $Pipeline = $Runspace.CreatePipeline($Scriptblock)
            if ($Data) {
                $Null = $Pipeline.Input.Write($Data, $True)
                $Pipeline.Input.Close()
            }
            $Null = $Pipeline.Invoke()
            $Pipeline.Dispose()
        }

        # Optionally import profile, variables, functions and aliases from the main runspace
        if ($UseProfile) {
            CreatePipeline -Script "`$PROFILE = '$PROFILE'; . `$PROFILE"
        }
        if ($Variable) {
            foreach ($var in (Get-Variable $Variable -Scope 2)) {
                trap {continue}
                $Runspace.SessionStateProxy.SetVariable($var.Name, $var.Value)
            }
        }
        if ($ImportItems) {
            CreatePipeline $ImportItems {
                foreach ($item in $Input) {New-Item -Path $item.PSPath -Value $item.Definition}
            }
        }
        if ($SnapIn) {
            CreatePipeline (Get-PSSnapin $Snapin -Registered) {$Input | Add-PSSnapin}
        }
        $Pipeline = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $Pipeline.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Main 
    # Initialize the queue from the pipeline
    . Init $Input
    # Start the pipelines
    while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline} 

    # Loop through the runspaces and pass their output to the main pipeline
    while ($Pipelines.Count) {
        # Only update the progress bar once a second
        if (($stopwatch.ElapsedMilliseconds - $LastUpdate) -gt 1000) {
            $Completed = $QueueLength - $Queue.Count - $Pipelines.count
            $LastUpdate = $stopwatch.ElapsedMilliseconds
            $SecondsRemaining = $(if ($Completed) {
                (($Queue.Count + $Pipelines.Count)*$LastUpdate/1000/$Completed)
            } else {-1})
            Write-Progress 'Split-Job' ("Queues: $($Pipelines.Count)  Total: $($QueueLength)  " +
            "Completed: $Completed  Pending: $($Queue.Count)")  `
            -PercentComplete ([Math]::Max((100-[Int]($Queue.Count+$Pipelines.Count)/$QueueLength*100),0)) `
            -CurrentOperation "Next item: $(trap {continue}; if ($Queue.Count) {$Queue.Peek()})" `
            -SecondsRemaining $SecondsRemaining
        }
        foreach ($Pipeline in @($Pipelines)) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | Out-Default
            } else {
                # Pipeline has stopped; if there was an error show info and restart it
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    $Pipeline.PipelineStateInfo.Reason.ErrorRecord |
                        Add-Member NoteProperty writeErrorStream $True -PassThru |
                            Out-Default
                    # Restart the runspace
                    if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}

Split-Job 0.92

Here is an update to the Split-Job function. Based in part on some of the comments on the previous version, I made the following changes:

  1. The format for the scriptblock has changed; this was done to make it more straightforward to specify parameters for those commands/scripts that accept pipeline input. If you need a foreach (%) you will have to include that in the command line. Examples:

    “Server1″,”Server2″,”Server3″ | Split-Job { c:test.ps1 -Force }

    “Server1″,”Server2″,”Server3″ | Split-Job { % {Get-WmiObject Win32_ComputerSystem -ComputerName $_}}

  2. You can now import your profile, variables and/or aliases into the runspaces. This is somewhat of an experiment; I am not convinced this is even a good idea. Please give me your feedback if you think this is useful.
  3. Each runspace will have its current directory ($PWD) set to that of the main runspace.

There is also some error handling code to make the script more robust.

Enjoy!

Arnoud

#requires -version 1.0
################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld - www.jansveld.net/powershell
## Version History
## 0.92   Add UseProfile switch: imports the PS profile into each runspace
##        Add Variable parameter: imports variable(s) into each runspace
##        Add Alias parameter: imports alias(es)
##        Restart pipeline if it stops due to an error
##        Set the current path in each runspace to that of the calling process
## 0.91   Revert to v 0.8 input syntax for the script block
##        Add error handling for empty input queue
## 0.9    Add logic to distinguish between scriptblocks and cmdlets or scripts:
##        if a ScriptBlock is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
################################################################################

function Split-Job (
    $Scriptblock = $(throw 'You must specify a command or script block!'),
    [int]$MaxPipelines=10,
    [switch]$UseProfile,
    [string[]]$Variable,
    [string[]]$Alias

) {
    # Create the shared thread-safe queue and fill it with the input objects
    $Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($Input))
    $QueueLength = $Queue.Count
    if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
    # Set up the script to be run by each runspace
    $Script  = "Set-Location '$PWD'; "
    $Script += '$Queue = $($Input); '
    $Script += '& {trap {continue}; while ($Queue.Count) {$Queue.Dequeue()}} |'
    $Script += $Scriptblock

    # Create an array to keep track of the set of pipelines
    $Pipelines = New-Object System.Collections.ArrayList

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        # Optionally import profile, variables and aliases from the main runspace
        if ($UseProfile) {
            $Pipeline = $Runspace.CreatePipeline(". '$PROFILE'")
            $Pipeline.Invoke()
            $Pipeline.Dispose()
        }
        if ($Variable) {
            Get-Variable $Variable -Scope 2 | foreach {
                trap {continue}
                $Runspace.SessionStateProxy.SetVariable($_.Name, $_.Value)
            }
        }
        if ($Alias) {
            $Pipeline = $Runspace.CreatePipeline({$Input | Set-Alias -value {$_.Definition}})
            $Null = $Pipeline.Input.Write((Get-Alias $Alias -Scope 2), $True)
            $Pipeline.Input.Close()
            $Pipeline.Invoke()
            $Pipeline.Dispose()
        }
        $Pipeline = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $Pipeline.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Start the pipelines
    while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline} 

    # Loop through the pipelines and pass their output to the pipeline until they are finished
    while ($Pipelines.Count) {
        Write-Progress 'Split-Job' "Queues: $($Pipelines.Count)" `
            -PercentComplete (100 - [Int]($Queue.Count)/$QueueLength*100)
        foreach ($Pipeline in (New-Object System.Collections.ArrayList(,$Pipelines))) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | Write-Error
            } else {
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    Write-Error $Pipeline.PipelineStateInfo.Reason
                    # Start a new runspace, unless there was a syntax error in the scriptblock
                    if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}

Split-Job: make your PC work harder

When you need to run a simple process or gather (WMI) data from many machines, you need a lot of patience. Or, you can divide and conquer using multiple PowerShell runspaces. There are many ingenious scripts available on the web that allow us to launch and manage background processes (even for PS v1). For my purposes, I found the necessary inspiration in a blog by Gaurhoth. His New-TaskPool script allows us to run multiple instances of a script block concurrently. Very cool!

The following script is a little different in that it pipes the data to each pipeline using a shared thread-safe queue rather than start a new pipeline for each input object. This reduces overhead and allows scripts that have a begin/process/end block to run more efficiently.

For instance, take this simple data gathering exercise:

Get-Content machines.txt | foreach {Get-WmiObject Win32_ComputerSystem -ComputerName $_} | Export-Csv ComputerInfo.csv

If you have a few hundred machines, this can take forever (especially if some machines are offline). Now replace the foreach alias with the Split-Job function:

Get-Content machines.txt | Split-Job {Get-WmiObject Win32_ComputerSystem -ComputerName $_} | Export-Csv ComputerInfo.csv

It will create 10 runspaces and run the WMI query concurrently, so this should be almost 10x faster. Even if one of the pipelines stalls, the others will keep going. If you already have some data gathering script that accepts pipeline input, you can just drop Split-Job in:

Get-Content machines.txt | Split-Job .MachineReport.ps1 | Export-Csv MachineReport.csv

It is important to note that the position of the script in the pipeline is important; the command preceding it should be quick, e.g. get objects from a text file, AD, SQL etc.

This is a work in progress and I will post more about this in the following weeks. In the meantime, comments and suggestions are welcome!

Arnoud

[UPDATE] The latest version can be found here: http://www.jansveld.net/powershell/2008/06/split-job-092/

 

#requires -version 1.0
###################################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld
## Version History
## 0.9    Includes logic to distinguish between scriptblocks and cmdlets or scripts. If a ScriptBlock
##        is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
###################################################################################################

function Split-Job (
    $Scriptblock = $(throw 'You must specify a command or script block!'),
    [int]$MaxPipelines=10
) {
    # Create the shared thread-safe queue and fill it with the input objects
    $Queue = [System.Collections.Queue]::Synchronized([System.Collections.Queue]@($Input))
    $QueueLength = $Queue.Count
    # Set up the script to be run by each pipeline
    if ($Scriptblock -is [ScriptBlock]) {$Scriptblock = "foreach {$Scriptblock}"}
    $Script = '$Queue = $($Input); & {while ($Queue.Count) {$Queue.Dequeue()}} | ' + $Scriptblock
    # Create an array to keep track of the set of pipelines
    $Pipelines = New-Object System.Collections.ArrayList

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        $PipeLine = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $PipeLine.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Start the pipelines
    do {Add-Pipeline} until ($Pipelines.Count -ge $MaxPipelines -or $Queue.Count -eq 0)

    # Loop through the pipelines and pass their output to the pipeline until they are finished
    while ($Pipelines.Count) {
        Write-Progress 'Split-Job' "Queues: $($Pipelines.Count)" `
            -PercentComplete (100 - [Int]($Queue.Count)/$QueueLength*100)
        foreach ($Pipeline in (New-Object System.Collections.ArrayList(,$Pipelines))) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | foreach {Write-Error $_}
            } else {
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    Write-Error $Pipeline.PipelineStateInfo.Reason
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}