Create New Item
Item Type
File
Folder
Item Name
Search file in folder and subfolders...
Are you sure want to rename?
File Manager
/
06
/
wp-content
/
plugins
/
the-events-calendar
/
src
/
Tribe
/
Aggregator
/
Record
:
Queue_Processor.php
Advanced Search
Upload
New Item
Settings
Back
Back Up
Advanced Editor
Save
<?php use Tribe\Events\Aggregator\Record\Batch_Queue; class Tribe__Events__Aggregator__Record__Queue_Processor { public static $scheduled_key = 'tribe_aggregator_process_insert_records'; /** * Which Action will be triggered as a single Cron event * * @since 4.5.9 * * @var string */ public static $scheduled_single_key = 'tribe_aggregator_single_process_insert_records'; /** *Number of items to be processed in a single batch. * * @var int */ public static $batch_size = 5; /** *Number of items to be processed in a single small batch. * * @var int */ public static $small_batch_size = 5; /** * Number of items in the current batch processed so far. * * @var int */ protected $processed = 0; /** * @var int */ protected $current_record_id = 0; /** * @var Tribe__Events__Aggregator__Record__Queue_Interface */ public $current_queue; public function __construct() { add_action( 'init', [ $this, 'action_init' ] ); } public function action_init() { $this->manage_scheduled_task(); } /** * Configures a scheduled task to handle "background processing" of import record insertions/updates. */ public function manage_scheduled_task() { add_action( 'tribe_events_blog_deactivate', [ $this, 'clear_scheduled_task' ] ); add_action( self::$scheduled_key, [ $this, 'process_queue' ], 20, 0 ); add_action( self::$scheduled_single_key, [ $this, 'process_queue' ], 20, 0 ); $this->register_scheduled_task(); } /** * Runs upon plugin activation, registering our scheduled task used to process * batches of pending import record inserts/updates. */ public function register_scheduled_task() { // Bail on registration of scheduled event in case we don't have an API setup. if ( is_wp_error( tribe( 'events-aggregator.service' )->api() ) ) { // Also clear in case we don't have an API key. $this->clear_scheduled_task(); return; } // Prevent from trying to schedule in case we don't have any scheduled records to process, value will either be false or 0. if ( ! $this->next_waiting_record( false, true ) ) { // Also clear in case we don't have any records to process. $this->clear_scheduled_task(); return; } // If we have one scheduled, don't schedule another. if ( wp_next_scheduled( self::$scheduled_key ) ) { return; } /** * Filter the interval at which to process import records. * * By default a custom interval of every 15mins is specified, however * other intervals such as "hourly", "twicedaily" and "daily" can * normally be substituted. * * @see wp_schedule_event() * @see 'cron_schedules' */ $interval = apply_filters( 'tribe_aggregator_record_processor_interval', 'tribe-every15mins' ); wp_schedule_event( time(), $interval, self::$scheduled_key ); } /** * Fires upon plugin deactivation. */ public function clear_scheduled_task() { wp_clear_scheduled_hook( self::$scheduled_key ); } /** * Process a batch of queued items for a specific import record. * * This is typically used when processing a small number of instances immediately upon * an import record queue being updated for a particular import record, or to facilitate * batches being updated via an ajax update loop. * * The default number of items processed in a single batch is 10, which can be * overridden using the tribe_events_aggregator_small_batch_size filter hook * * @param int $record_id * @param int $batch_size */ public function process_batch( $record_id, $batch_size = null ) { /** * Sets the default number of instances to be immediately processed when a record has items to insert * * @param int $small_batch_size */ $default_batch_size = apply_filters( 'tribe_aggregator_small_batch_size', self::$small_batch_size ); self::$batch_size = ( null === $batch_size ) ? $default_batch_size : (int) $batch_size; $this->current_record_id = (int) $record_id; $this->do_processing(); } /** * Processes the next waiting batch of Import Record posts, if there are any. * * @param int $batch_size */ public function process_queue( $batch_size = null ) { if ( null === $batch_size ) { /** * Controls the size of each batch processed by default (ie, during cron updates of record * inserts/updates). * * @param int $default_batch_size */ self::$batch_size = (int) apply_filters( 'tribe_aggregator_batch_size', self::$batch_size ); } else { self::$batch_size = (int) $batch_size; } while ( $this->next_waiting_record() ) { if ( ! $this->do_processing() ) { break; } } $queue_items = get_post_meta( $this->current_record_id, Tribe__Events__Aggregator__Records::instance()->prefix_meta( Tribe__Events__Aggregator__Record__Queue::$queue_key ), true ); // We only get here if we done processing this batch // Now we will check for more events on the queue if ( ! empty( $queue_items ) ) { // Schedule a Cron Event to happen ASAP, and flag it for searching and we need to make it unique // By default WordPress won't allow more than one Action to happen twice in 10 minutes wp_schedule_single_event( time(), self::$scheduled_single_key ); } } public function set_current_queue( Tribe__Events__Aggregator__Record__Queue_Interface $queue ) { $this->current_queue = $queue; } /** * Obtains the post ID of the next record which has a queue of items in need * of processing. * * If no records in need of further processing can be found it will return bool false. * * @since 5.3.0 Inclusion of a $cache param for performance purposes. * * @param boolean $interactive_only Whether or not we should look for imports that were kicked off interactively * @param boolean $cache When checking on every request we should utilize transient caching to prevent hitting the DB every time. * * @return boolean|integer */ public function next_waiting_record( $interactive_only = false, $cache = false ) { if ( true === $cache ) { $interactive_only_suffix = ''; if ( $interactive_only ) { $interactive_only_suffix = '_interactive_only'; } $transient_key = 'tribe-event-aggregator-next_waiting_record' . ( ! $interactive_only ? '' : '_interactive_only' ); $next_waiting_record = get_transient( $transient_key ); if ( ! empty( $next_waiting_record ) ) { return $this->current_record_id = $next_waiting_record; } elseif ( null === $next_waiting_record ) { // When not false we return false; } } $args = [ 'post_type' => Tribe__Events__Aggregator__Records::$post_type, 'post_status' => 'any', 'posts_per_page' => 1, 'meta_query' => [ [ 'key' => Tribe__Events__Aggregator__Record__Abstract::$meta_key_prefix . Tribe__Events__Aggregator__Record__Queue::$queue_key, 'compare' => 'EXISTS', ], ], ]; if ( $interactive_only ) { $args['meta_query'][] = [ 'key' => Tribe__Events__Aggregator__Record__Abstract::$meta_key_prefix . 'interactive', 'compare' => 'EXISTS', ]; } $waiting_records = get_posts( $args ); if ( empty( $waiting_records ) ) { // Set cache in case of usage. if ( true === $cache ) { // Setting to null prevents us from running for 5 minutes. set_transient( $transient_key, null, 5 * MINUTE_IN_SECONDS ); } return $this->current_record_id = 0; } $next_record = array_shift( $waiting_records ); // Set cache in case of usage. if ( true === $cache ) { set_transient( $transient_key, $next_record->ID, 5 * MINUTE_IN_SECONDS ); } return $this->current_record_id = $next_record->ID; } /** * Processes the current import record queue. May return boolean false if it is unable to continue. * * @return bool */ protected function do_processing() { // Bail out if the batch limit has been exceeded, if nothing is waiting in the queue // or the queue is actively being processed by a concurrent request/scheduled task if ( $this->batch_complete() || ! $this->get_current_queue() || $this->current_queue->is_in_progress() ) { return false; } $this->current_queue->set_in_progress_flag(); $processed = $this->current_queue->process( self::$batch_size ); // in the 'fetch' phase this will not be a Queue object if ( $processed instanceof Tribe__Events__Aggregator__Record__Queue_Interface ) { $this->processed += $processed->activity->count( $this->current_queue->get_queue_type() ); } $this->current_queue->clear_in_progress_flag(); return true; } /** * Returns true if a non-empty queue exists for the current record, else returns false. * * @return bool */ protected function get_current_queue() { try { $this->current_queue = self::build_queue( $this->current_record_id ); } catch ( InvalidArgumentException $e ) { do_action( 'log', sprintf( __( 'Could not process queue for Import Record %1$d: %2$s', 'the-events-calendar' ), $this->current_record_id, $e->getMessage() ) ); return false; } if ( $this->current_queue->is_stuck() || $this->current_queue->has_errors() ) { $this->current_queue->kill_queue(); return false; } return $this->current_queue->is_empty() ? false : true; } /** * Determines if the batch job is complete. * * Currently this is simply a measure of the number of instances processed against * the batch size limit - however it could potentially be expanded to include an * additional time based check. * * @return bool */ protected function batch_complete() { return ( $this->processed >= self::$batch_size ); } /** * Builds the correct class of queue. * * @since 4.6.16 * * @param int|Tribe__Events__Aggregator__Record__Abstract $record A record object or ID * @param array|string $items * @param bool $use_legacy Whether to use the legacy queue processor or not. * * @return Tribe__Events__Aggregator__Record__Queue_Interface */ public static function build_queue( $record, $items = null, $use_legacy = false ) { if ( ( defined( 'TRIBE_EA_QUEUE_USE_LEGACY' ) && TRIBE_EA_QUEUE_USE_LEGACY ) || (bool) getenv( 'TRIBE_EA_QUEUE_USE_LEGACY' ) || false !== (bool) tribe_get_request_var( 'tribe_ea_queue_use_legacy', false ) ) { $use_legacy = true; } if ( is_numeric( $record ) ) { $record = tribe( 'events-aggregator.records' )->get_by_post_id( $record ); } if ( ! $record instanceof Tribe__Events__Aggregator__Record__Abstract ) { if ( $record instanceof WP_Error ) { return new Tribe__Events__Aggregator__Record__Void_Queue( $record ); } return new Tribe__Events__Aggregator__Record__Void_Queue( __( 'There was an error building the record queue: ' . print_r( $record, true ) ) ); } /** @var Tribe__Events__Aggregator__Settings $settings */ $settings = tribe( 'events-aggregator.settings' ); $class = $settings->get_import_process_class(); // Force the use of the Legacy Queue for CSV Imports if ( $record instanceof Tribe__Events__Aggregator__Record__CSV || $use_legacy ) { $class = Tribe__Events__Aggregator__Record__Queue::class; } // If the current Queue is a cron Queue or a Batch Queue. $is_batch_queue = ( Tribe__Events__Aggregator__Record__Queue::class === $class || Batch_Queue::class === $class ); $use_batch_queue = ( $use_legacy || $is_batch_queue ); if ( $use_batch_queue && ! empty( $record->meta ) && ! empty( $record->meta['allow_batch_push'] ) && tribe_is_truthy( $record->meta['allow_batch_push'] ) ) { $class = Batch_Queue::class; } /** * Filters the class of the queue that should be used. * * This filter can also return a fully built queue object. * * @since 4.6.16 * * @param string $class The import process class that will be used to process * import records. * @param Tribe__Events__Aggregator__Record__Abstract $record The current record being processed. * @param array|string $items Either an array of the record items to process or a string * to indicate pre-process states like fetch or on-hold. */ $class = apply_filters( 'tribe_aggregator_queue_class', $class, $record, $items ); return $class instanceof Tribe__Events__Aggregator__Record__Queue_Interface ? $class : new $class( $record, $items ); } }