Created
November 28, 2025 14:09
-
-
Save AlexeyKosov/c45d7354e8488d7498c641b96ac13f14 to your computer and use it in GitHub Desktop.
Runs all scheduler transports in a single messenger:consume worker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| declare(strict_types=1); | |
| namespace App\Command\Schedule; | |
| use App\Command\ExclusiveCommand; | |
| use Psr\Container\ContainerInterface; | |
| use Symfony\Component\Console\Attribute\AsCommand; | |
| use Symfony\Component\Console\Command\Command; | |
| use Symfony\Component\Console\Input\ArrayInput; | |
| use Symfony\Component\Console\Input\InputInterface; | |
| use Symfony\Component\Console\Output\OutputInterface; | |
| use Symfony\Component\Console\Style\SymfonyStyle; | |
| use Symfony\Component\DependencyInjection\Attribute\Autowire; | |
| #[AsCommand(name: 'schedule:run', description: 'Runs scheduled commands')] | |
| class RunCommand extends ExclusiveCommand | |
| { | |
| public function __construct( | |
| #[Autowire('@messenger.receiver_locator')] | |
| private ContainerInterface $receiverLocator, | |
| ) { | |
| parent::__construct(); | |
| } | |
| protected function configure(): void | |
| { | |
| $this->setDescription('Runs all scheduler transports in a single messenger:consume worker.'); | |
| } | |
| protected function execute(InputInterface $input, OutputInterface $output): int | |
| { | |
| $io = new SymfonyStyle($input, $output); | |
| $allReceivers = array_keys($this->receiverLocator->getProvidedServices()); | |
| // Pick only scheduler_* transports | |
| $schedulerTransports = array_values(array_filter( | |
| $allReceivers, | |
| static fn (string $name): bool => str_starts_with($name, 'scheduler_'), | |
| )); | |
| if ($schedulerTransports === []) { | |
| $io->warning('No scheduler_* transports found (no schedules registered?).'); | |
| return Command::SUCCESS; | |
| } | |
| $io->text('Running scheduler transports:'); | |
| foreach ($schedulerTransports as $t) { | |
| $io->text(' - '.$t); | |
| } | |
| $application = $this->getApplication(); | |
| if (!$application) { | |
| throw new \RuntimeException('Console application is not available.'); | |
| } | |
| $command = $application->find('messenger:consume'); | |
| $consumeInput = new ArrayInput([ | |
| 'command' => 'messenger:consume', | |
| 'receivers' => $schedulerTransports, | |
| ]); | |
| $io->newLine(); | |
| $io->text('Starting messenger:consume ...'); | |
| return $command->run($consumeInput, $output); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment