1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
| use swoole_process as SwooleProcess;
interface Processer { public function run(SwooleProcess $woker); } class ProcessManager {
protected static $processMap = [];
protected static function success($data = [], $message = "success", $code = 0) { return json_encode(['code' => $code, 'message' => $message, 'data' => $data]); }
protected static function error($code = 1, $message = "error", $data = []) { return self::success($data, $message, $code); }
public static function submitTask($KEY, Processer $task) { $process = new SwooleProcess(function (SwooleProcess $worker) use ($task) { try { $respData = $task->run($worker); $worker->push(static::success($respData)); $worker->exit(0); } catch (\Exception $e) { $worker->push(static::error($e->getCode(), $e->getMessage())); $worker->exit($e->getCode()); } }); $process->useQueue(crc32($KEY)); $pid = $process->start(); static::$processMap[$KEY] = array_merge(self::$processMap[$KEY], [ $pid => $process ]); }
public static function waitResp($KEY, $interrupt = false) { $respData = []; if (!isset(static::$processMap[$KEY])) { throw new \Exception(sprintf('对应进程组: %s, 不存在!', $KEY)); } $processList = self::$processMap[$KEY]; foreach ($processList as $pid => $process) { SwooleProcess::wait(); $result = json_decode($process->pop(), true); if ($result['code'] && $interrupt) { $process->freeQueue(); throw new \Exception($result['message'], $result['code']); } $respData[] = $result; } unset(self::$processMap[$KEY]); return $respData; }
}
class TestJob implements Processer {
public function run(SwooleProcess $woker) { sleep(5); $random = rand(1, 100); if ($random % 5 == 0) { throw new \Exception('timeout error ! code: ' . $random, $random); } return $random; } }
ProcessManager::submitTask("a", new TestJob()); ProcessManager::submitTask("a", new TestJob()); ProcessManager::submitTask("a", new TestJob()); ProcessManager::submitTask("a", new TestJob()); ProcessManager::submitTask("a", new TestJob()); ProcessManager::submitTask("a", new TestJob()); ProcessManager::submitTask("a", new TestJob());
$result = ProcessManager::waitResp("a"); var_dump($result);
|