SoFunction
Updated on 2025-03-08

PowerJob's AbstractScriptProcessor implementation class workflow source code interpretation

sequence

This article mainly studies PowerJob's AbstractScriptProcessor

AbstractScriptProcessor

tech/powerjob/official/processors/impl/script/

@Slf4j
public abstract class AbstractScriptProcessor extends CommonBasicProcessor {
    private static final ForkJoinPool POOL = new ForkJoinPool(4 * ().availableProcessors());
    private static final Set<String> DOWNLOAD_PROTOCOL = ("http", "https", "ftp");
    protected static final String SH_SHELL = "/bin/sh";
    protected static final String CMD_SHELL = "";
    private static final String WORKER_DIR = () + "/official_script_processor/";
    @Override
    protected ProcessResult process0(TaskContext context) throws Exception {
        OmsLogger omsLogger = ();
        String scriptParams = (context);
        ("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams);
        if (scriptParams == null) {
            String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration.";
            (message);
            return new ProcessResult(false, message);
        }
        String scriptPath = prepareScriptFile((), scriptParams);
        ("[SYSTEM] Generate executable file successfully, path: {}", scriptPath);
        if (SystemUtils.IS_OS_WINDOWS) {
            if ((getRunCommand(), SH_SHELL)) {
                String message = ("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME);
                (message);
                return new ProcessResult(false, message);
            }
        }
        // Authorization        if  ( !SystemUtils.IS_OS_WINDOWS) {
            ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath);
            // Waiting for return, it is impossible to cause deadlock here (the shell generates a large amount of data and may cause deadlock)            ().waitFor();
            ("[SYSTEM] chmod 755 authorization complete, ready to start execution~");
        }
        // 2. Execute the target script        ProcessBuilder pb = (getRunCommand(), CMD_SHELL) ?
                new ProcessBuilder(getRunCommand(), "/c", scriptPath)
                : new ProcessBuilder(getRunCommand(), scriptPath);
        Process process = ();
        StringBuilder inputBuilder = new StringBuilder();
        StringBuilder errorBuilder = new StringBuilder();
        boolean success = true;
        String result;
        final Charset charset = getCharset();
        try (InputStream is = (); InputStream es = ()) {
            (() -> copyStream(is, inputBuilder, omsLogger, charset));
            (() -> copyStream(es, errorBuilder, omsLogger, charset));
            success = () == 0;
        } catch (InterruptedException ie) {
            ("[SYSTEM] ScriptProcessor has been interrupted");
        } finally {
            result = ("[INPUT]: %s;[ERROR]: %s", (), ());
        }
        return new ProcessResult(success, result);
    }
    /**
      * Generate script name
      * @param instanceId id of instance
      * @return File name
      */
    protected abstract String getScriptName(Long instanceId);
    /**
      * Get the run command (eg, shell returns /bin/sh)
      * @return Command to execute script
      */
    protected abstract String getRunCommand();
    //......
}
AbstractScriptProcessor inherits CommonBasicProcessor, which defines a ForkJoinPool with parallelism of 4*().availableProcessors(); its process0 method first reads scriptParams, then executes prepareScriptFile to get scriptPath, then uses chmod to change script permission to 755, then obtains the command through getRunCommand, then submits copyStream to the pool, and waits for the process to return.

prepareScriptFile

private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException {
        String scriptPath = WORKER_DIR + getScriptName(instanceId);
        File script = new File(scriptPath);
        if (()) {
            return scriptPath;
        }
        File dir = new File(());
        boolean success = ();
        success = ();
        if (!success) {
            throw new RuntimeException("create script file failed");
        }

        // If it is a download link, get it from the network        for (String protocol : DOWNLOAD_PROTOCOL) {
            if ((protocol)) {
                (new URL(processorInfo), script, 5000, 300000);
                return scriptPath;
            }
        }

        final Charset charset = getCharset();

        if(charset != null)
        {
            try (Writer fstream = new OutputStreamWriter((()), charset); BufferedWriter out = new BufferedWriter(fstream)) {
                (processorInfo);
                ();
            }
        }
        else {
            try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) {
                (processorInfo);
                ();
            }
        }
        return scriptPath;
    }
prepareScriptFile first get scriptPath through getScriptName. If it is a link to http, https, and ftp, it will be downloaded. Otherwise, write scriptParams to scriptPath

copyStream

private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) {
        String line;
        try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) {
            while ((line = ()) != null) {
                (line);
                // Synchronize to online logs                (line);
            }
        } catch (Exception e) {
            ("[ScriptProcessor] copyStream failed.", e);
            ("[SYSTEM] copyStream failed.", e);

            ("Exception: ").append(e);
        }
    }
copyStream will read the InputStream to the StringBuilder and print it to the omsLogger at the same time

ShellProcessor

tech/powerjob/official/processors/impl/script/

public class ShellProcessor extends AbstractScriptProcessor {
    @Override
    protected String getScriptName(Long instanceId) {
        return ("shell_%", instanceId);
    }
    @Override
    protected String getRunCommand() {
        return SH_SHELL;
    }
}
The getScriptName of ShellProcessor is generated based on shell_% and instanceId; its getRunCommand is /bin/sh

CMDProcessor

tech/powerjob/official/processors/impl/script/

public class CMDProcessor extends AbstractScriptProcessor {
    @Override
    protected String getScriptName(Long instanceId) {
        return ("cmd_%", instanceId);
    }
    @Override
    protected String getRunCommand() {
        return "";
    }
    @Override
    protected Charset getCharset() {
        return ();
    }
}
The getScriptName of CMDProcessor is generated based on cmd_% and instanceId, and its getRunCommand is`

PowerShellProcessor

tech/powerjob/official/processors/impl/script/

public class PowerShellProcessor extends AbstractScriptProcessor {

    @Override
    protected String getScriptName(Long instanceId) {
        return ("powershell_%d.ps1", instanceId);
    }

    @Override
    protected String getRunCommand() {
        return "";
    }

    @Override
    protected Charset getCharset() {
        return ();
    }
}
The getScriptName of PowerShellProcessor is generated based on powershell_%d.ps1" and instanceId, and its getRunCommand is

PythonProcessor

tech/powerjob/official/processors/impl/script/

public class PythonProcessor extends AbstractScriptProcessor {

    @Override
    protected String getScriptName(Long instanceId) {
        return ("python_%", instanceId);
    }

    @Override
    protected String getRunCommand() {
        return "python";
    }
}
PythonProcessor's getScriptName is generated based on python_% and instanceId, and its getRunCommand is python

summary

AbstractScriptProcessor inherits CommonBasicProcessor, which has four implementation classes, namely ShellProcessor, CMDProcessor, PowerShellProcessor, and PythonProcessor; it defines the getScriptName and getRunCommand abstract methods; its process0 method mainly writes scriptParams to a local file (scriptParams is http, https, and ftp, and downloads according to the URL), then modify the permission to 755, then execute(), then collects the input and errorStream into StringBuilder and prints to omsLogger, and finally () waits for the processing to be completed.

The above is the detailed interpretation of the source code of PowerJob's AbstractScriptProcessor method workflow. For more information about PowerJob AbstractScriptProcessor, please follow my other related articles!