sequence
This article mainly studies PowerJob's AbstractScriptProcessor
AbstractScriptProcessor
tech/powerjob/official/processors/impl/script/
@Slf4j public abstract class AbstractScriptProcessor extends CommonBasicProcessor { private static final ForkJoinPool POOL = new ForkJoinPool(4 * ().availableProcessors()); private static final Set<String> DOWNLOAD_PROTOCOL = ("http", "https", "ftp"); protected static final String SH_SHELL = "/bin/sh"; protected static final String CMD_SHELL = ""; private static final String WORKER_DIR = () + "/official_script_processor/"; @Override protected ProcessResult process0(TaskContext context) throws Exception { OmsLogger omsLogger = (); String scriptParams = (context); ("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams); if (scriptParams == null) { String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration."; (message); return new ProcessResult(false, message); } String scriptPath = prepareScriptFile((), scriptParams); ("[SYSTEM] Generate executable file successfully, path: {}", scriptPath); if (SystemUtils.IS_OS_WINDOWS) { if ((getRunCommand(), SH_SHELL)) { String message = ("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME); (message); return new ProcessResult(false, message); } } // Authorization if ( !SystemUtils.IS_OS_WINDOWS) { ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); // Waiting for return, it is impossible to cause deadlock here (the shell generates a large amount of data and may cause deadlock) ().waitFor(); ("[SYSTEM] chmod 755 authorization complete, ready to start execution~"); } // 2. Execute the target script ProcessBuilder pb = (getRunCommand(), CMD_SHELL) ? new ProcessBuilder(getRunCommand(), "/c", scriptPath) : new ProcessBuilder(getRunCommand(), scriptPath); Process process = (); StringBuilder inputBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder(); boolean success = true; String result; final Charset charset = getCharset(); try (InputStream is = (); InputStream es = ()) { (() -> copyStream(is, inputBuilder, omsLogger, charset)); (() -> copyStream(es, errorBuilder, omsLogger, charset)); success = () == 0; } catch (InterruptedException ie) { ("[SYSTEM] ScriptProcessor has been interrupted"); } finally { result = ("[INPUT]: %s;[ERROR]: %s", (), ()); } return new ProcessResult(success, result); } /** * Generate script name * @param instanceId id of instance * @return File name */ protected abstract String getScriptName(Long instanceId); /** * Get the run command (eg, shell returns /bin/sh) * @return Command to execute script */ protected abstract String getRunCommand(); //...... }
AbstractScriptProcessor inherits CommonBasicProcessor, which defines a ForkJoinPool with parallelism of 4*().availableProcessors(); its process0 method first reads scriptParams, then executes prepareScriptFile to get scriptPath, then uses chmod to change script permission to 755, then obtains the command through getRunCommand, then submits copyStream to the pool, and waits for the process to return.
prepareScriptFile
private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException { String scriptPath = WORKER_DIR + getScriptName(instanceId); File script = new File(scriptPath); if (()) { return scriptPath; } File dir = new File(()); boolean success = (); success = (); if (!success) { throw new RuntimeException("create script file failed"); } // If it is a download link, get it from the network for (String protocol : DOWNLOAD_PROTOCOL) { if ((protocol)) { (new URL(processorInfo), script, 5000, 300000); return scriptPath; } } final Charset charset = getCharset(); if(charset != null) { try (Writer fstream = new OutputStreamWriter((()), charset); BufferedWriter out = new BufferedWriter(fstream)) { (processorInfo); (); } } else { try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { (processorInfo); (); } } return scriptPath; }
prepareScriptFile first get scriptPath through getScriptName. If it is a link to http, https, and ftp, it will be downloaded. Otherwise, write scriptParams to scriptPath
copyStream
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { while ((line = ()) != null) { (line); // Synchronize to online logs (line); } } catch (Exception e) { ("[ScriptProcessor] copyStream failed.", e); ("[SYSTEM] copyStream failed.", e); ("Exception: ").append(e); } }
copyStream will read the InputStream to the StringBuilder and print it to the omsLogger at the same time
ShellProcessor
tech/powerjob/official/processors/impl/script/
public class ShellProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return ("shell_%", instanceId); } @Override protected String getRunCommand() { return SH_SHELL; } }
The getScriptName of ShellProcessor is generated based on shell_% and instanceId; its getRunCommand is /bin/sh
CMDProcessor
tech/powerjob/official/processors/impl/script/
public class CMDProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return ("cmd_%", instanceId); } @Override protected String getRunCommand() { return ""; } @Override protected Charset getCharset() { return (); } }
The getScriptName of CMDProcessor is generated based on cmd_% and instanceId, and its getRunCommand is`
PowerShellProcessor
tech/powerjob/official/processors/impl/script/
public class PowerShellProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return ("powershell_%d.ps1", instanceId); } @Override protected String getRunCommand() { return ""; } @Override protected Charset getCharset() { return (); } }
The getScriptName of PowerShellProcessor is generated based on powershell_%d.ps1" and instanceId, and its getRunCommand is
PythonProcessor
tech/powerjob/official/processors/impl/script/
public class PythonProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return ("python_%", instanceId); } @Override protected String getRunCommand() { return "python"; } }
PythonProcessor's getScriptName is generated based on python_% and instanceId, and its getRunCommand is python
summary
AbstractScriptProcessor inherits CommonBasicProcessor, which has four implementation classes, namely ShellProcessor, CMDProcessor, PowerShellProcessor, and PythonProcessor; it defines the getScriptName and getRunCommand abstract methods; its process0 method mainly writes scriptParams to a local file (scriptParams is http, https, and ftp, and downloads according to the URL), then modify the permission to 755, then execute(), then collects the input and errorStream into StringBuilder and prints to omsLogger, and finally () waits for the processing to be completed.
The above is the detailed interpretation of the source code of PowerJob's AbstractScriptProcessor method workflow. For more information about PowerJob AbstractScriptProcessor, please follow my other related articles!