File "Queue_Processor.php"
Full Path: /home/londdqdw/public_html/06/wp-content/plugins/the-events-calendar/src/Tribe/Aggregator/Record/Queue_Processor.php
File size: 12.28 KB
MIME-type: text/x-php
Charset: utf-8
<?php
use Tribe\Events\Aggregator\Record\Batch_Queue;
class Tribe__Events__Aggregator__Record__Queue_Processor {
public static $scheduled_key = 'tribe_aggregator_process_insert_records';
/**
* Which Action will be triggered as a single Cron event
*
* @since 4.5.9
*
* @var string
*/
public static $scheduled_single_key = 'tribe_aggregator_single_process_insert_records';
/**
*Number of items to be processed in a single batch.
*
* @var int
*/
public static $batch_size = 5;
/**
*Number of items to be processed in a single small batch.
*
* @var int
*/
public static $small_batch_size = 5;
/**
* Number of items in the current batch processed so far.
*
* @var int
*/
protected $processed = 0;
/**
* @var int
*/
protected $current_record_id = 0;
/**
* @var Tribe__Events__Aggregator__Record__Queue_Interface
*/
public $current_queue;
public function __construct() {
add_action( 'init', [ $this, 'action_init' ] );
}
public function action_init() {
$this->manage_scheduled_task();
}
/**
* Configures a scheduled task to handle "background processing" of import record insertions/updates.
*/
public function manage_scheduled_task() {
add_action( 'tribe_events_blog_deactivate', [ $this, 'clear_scheduled_task' ] );
add_action( self::$scheduled_key, [ $this, 'process_queue' ], 20, 0 );
add_action( self::$scheduled_single_key, [ $this, 'process_queue' ], 20, 0 );
$this->register_scheduled_task();
}
/**
* Runs upon plugin activation, registering our scheduled task used to process
* batches of pending import record inserts/updates.
*/
public function register_scheduled_task() {
// Bail on registration of scheduled event in case we don't have an API setup.
if ( is_wp_error( tribe( 'events-aggregator.service' )->api() ) ) {
// Also clear in case we don't have an API key.
$this->clear_scheduled_task();
return;
}
// Prevent from trying to schedule in case we don't have any scheduled records to process, value will either be false or 0.
if ( ! $this->next_waiting_record( false, true ) ) {
// Also clear in case we don't have any records to process.
$this->clear_scheduled_task();
return;
}
// If we have one scheduled, don't schedule another.
if ( wp_next_scheduled( self::$scheduled_key ) ) {
return;
}
/**
* Filter the interval at which to process import records.
*
* By default a custom interval of every 15mins is specified, however
* other intervals such as "hourly", "twicedaily" and "daily" can
* normally be substituted.
*
* @see wp_schedule_event()
* @see 'cron_schedules'
*/
$interval = apply_filters( 'tribe_aggregator_record_processor_interval', 'tribe-every15mins' );
wp_schedule_event( time(), $interval, self::$scheduled_key );
}
/**
* Fires upon plugin deactivation.
*/
public function clear_scheduled_task() {
wp_clear_scheduled_hook( self::$scheduled_key );
}
/**
* Process a batch of queued items for a specific import record.
*
* This is typically used when processing a small number of instances immediately upon
* an import record queue being updated for a particular import record, or to facilitate
* batches being updated via an ajax update loop.
*
* The default number of items processed in a single batch is 10, which can be
* overridden using the tribe_events_aggregator_small_batch_size filter hook
*
* @param int $record_id
* @param int $batch_size
*/
public function process_batch( $record_id, $batch_size = null ) {
/**
* Sets the default number of instances to be immediately processed when a record has items to insert
*
* @param int $small_batch_size
*/
$default_batch_size = apply_filters( 'tribe_aggregator_small_batch_size', self::$small_batch_size );
self::$batch_size = ( null === $batch_size ) ? $default_batch_size : (int) $batch_size;
$this->current_record_id = (int) $record_id;
$this->do_processing();
}
/**
* Processes the next waiting batch of Import Record posts, if there are any.
*
* @param int $batch_size
*/
public function process_queue( $batch_size = null ) {
if ( null === $batch_size ) {
/**
* Controls the size of each batch processed by default (ie, during cron updates of record
* inserts/updates).
*
* @param int $default_batch_size
*/
self::$batch_size = (int) apply_filters( 'tribe_aggregator_batch_size', self::$batch_size );
} else {
self::$batch_size = (int) $batch_size;
}
while ( $this->next_waiting_record() ) {
if ( ! $this->do_processing() ) {
break;
}
}
$queue_items = get_post_meta( $this->current_record_id, Tribe__Events__Aggregator__Records::instance()->prefix_meta( Tribe__Events__Aggregator__Record__Queue::$queue_key ), true );
// We only get here if we done processing this batch
// Now we will check for more events on the queue
if ( ! empty( $queue_items ) ) {
// Schedule a Cron Event to happen ASAP, and flag it for searching and we need to make it unique
// By default WordPress won't allow more than one Action to happen twice in 10 minutes
wp_schedule_single_event( time(), self::$scheduled_single_key );
}
}
public function set_current_queue( Tribe__Events__Aggregator__Record__Queue_Interface $queue ) {
$this->current_queue = $queue;
}
/**
* Obtains the post ID of the next record which has a queue of items in need
* of processing.
*
* If no records in need of further processing can be found it will return bool false.
*
* @since 5.3.0 Inclusion of a $cache param for performance purposes.
*
* @param boolean $interactive_only Whether or not we should look for imports that were kicked off interactively
* @param boolean $cache When checking on every request we should utilize transient caching to prevent hitting the DB every time.
*
* @return boolean|integer
*/
public function next_waiting_record( $interactive_only = false, $cache = false ) {
if ( true === $cache ) {
$interactive_only_suffix = '';
if ( $interactive_only ) {
$interactive_only_suffix = '_interactive_only';
}
$transient_key = 'tribe-event-aggregator-next_waiting_record' . ( ! $interactive_only ? '' : '_interactive_only' );
$next_waiting_record = get_transient( $transient_key );
if ( ! empty( $next_waiting_record ) ) {
return $this->current_record_id = $next_waiting_record;
} elseif ( null === $next_waiting_record ) {
// When not false we
return false;
}
}
$args = [
'post_type' => Tribe__Events__Aggregator__Records::$post_type,
'post_status' => 'any',
'posts_per_page' => 1,
'meta_query' => [
[
'key' => Tribe__Events__Aggregator__Record__Abstract::$meta_key_prefix . Tribe__Events__Aggregator__Record__Queue::$queue_key,
'compare' => 'EXISTS',
],
],
];
if ( $interactive_only ) {
$args['meta_query'][] = [
'key' => Tribe__Events__Aggregator__Record__Abstract::$meta_key_prefix . 'interactive',
'compare' => 'EXISTS',
];
}
$waiting_records = get_posts( $args );
if ( empty( $waiting_records ) ) {
// Set cache in case of usage.
if ( true === $cache ) {
// Setting to null prevents us from running for 5 minutes.
set_transient( $transient_key, null, 5 * MINUTE_IN_SECONDS );
}
return $this->current_record_id = 0;
}
$next_record = array_shift( $waiting_records );
// Set cache in case of usage.
if ( true === $cache ) {
set_transient( $transient_key, $next_record->ID, 5 * MINUTE_IN_SECONDS );
}
return $this->current_record_id = $next_record->ID;
}
/**
* Processes the current import record queue. May return boolean false if it is unable to continue.
*
* @return bool
*/
protected function do_processing() {
// Bail out if the batch limit has been exceeded, if nothing is waiting in the queue
// or the queue is actively being processed by a concurrent request/scheduled task
if ( $this->batch_complete() || ! $this->get_current_queue() || $this->current_queue->is_in_progress() ) {
return false;
}
$this->current_queue->set_in_progress_flag();
$processed = $this->current_queue->process( self::$batch_size );
// in the 'fetch' phase this will not be a Queue object
if ( $processed instanceof Tribe__Events__Aggregator__Record__Queue_Interface ) {
$this->processed += $processed->activity->count( $this->current_queue->get_queue_type() );
}
$this->current_queue->clear_in_progress_flag();
return true;
}
/**
* Returns true if a non-empty queue exists for the current record, else returns false.
*
* @return bool
*/
protected function get_current_queue() {
try {
$this->current_queue = self::build_queue( $this->current_record_id );
} catch ( InvalidArgumentException $e ) {
do_action( 'log', sprintf( __( 'Could not process queue for Import Record %1$d: %2$s', 'the-events-calendar' ), $this->current_record_id, $e->getMessage() ) );
return false;
}
if ( $this->current_queue->is_stuck() || $this->current_queue->has_errors() ) {
$this->current_queue->kill_queue();
return false;
}
return $this->current_queue->is_empty() ? false : true;
}
/**
* Determines if the batch job is complete.
*
* Currently this is simply a measure of the number of instances processed against
* the batch size limit - however it could potentially be expanded to include an
* additional time based check.
*
* @return bool
*/
protected function batch_complete() {
return ( $this->processed >= self::$batch_size );
}
/**
* Builds the correct class of queue.
*
* @since 4.6.16
*
* @param int|Tribe__Events__Aggregator__Record__Abstract $record A record object or ID
* @param array|string $items
* @param bool $use_legacy Whether to use the legacy queue processor or not.
*
* @return Tribe__Events__Aggregator__Record__Queue_Interface
*/
public static function build_queue( $record, $items = null, $use_legacy = false ) {
if (
( defined( 'TRIBE_EA_QUEUE_USE_LEGACY' ) && TRIBE_EA_QUEUE_USE_LEGACY )
|| (bool) getenv( 'TRIBE_EA_QUEUE_USE_LEGACY' )
|| false !== (bool) tribe_get_request_var( 'tribe_ea_queue_use_legacy', false )
) {
$use_legacy = true;
}
if ( is_numeric( $record ) ) {
$record = tribe( 'events-aggregator.records' )->get_by_post_id( $record );
}
if ( ! $record instanceof Tribe__Events__Aggregator__Record__Abstract ) {
if ( $record instanceof WP_Error ) {
return new Tribe__Events__Aggregator__Record__Void_Queue( $record );
}
return new Tribe__Events__Aggregator__Record__Void_Queue( __( 'There was an error building the record queue: ' . print_r( $record, true ) ) );
}
/** @var Tribe__Events__Aggregator__Settings $settings */
$settings = tribe( 'events-aggregator.settings' );
$class = $settings->get_import_process_class();
// Force the use of the Legacy Queue for CSV Imports
if ( $record instanceof Tribe__Events__Aggregator__Record__CSV || $use_legacy ) {
$class = Tribe__Events__Aggregator__Record__Queue::class;
}
// If the current Queue is a cron Queue or a Batch Queue.
$is_batch_queue = ( Tribe__Events__Aggregator__Record__Queue::class === $class || Batch_Queue::class === $class );
$use_batch_queue = ( $use_legacy || $is_batch_queue );
if (
$use_batch_queue
&& ! empty( $record->meta )
&& ! empty( $record->meta['allow_batch_push'] )
&& tribe_is_truthy( $record->meta['allow_batch_push'] )
) {
$class = Batch_Queue::class;
}
/**
* Filters the class of the queue that should be used.
*
* This filter can also return a fully built queue object.
*
* @since 4.6.16
*
* @param string $class The import process class that will be used to process
* import records.
* @param Tribe__Events__Aggregator__Record__Abstract $record The current record being processed.
* @param array|string $items Either an array of the record items to process or a string
* to indicate pre-process states like fetch or on-hold.
*/
$class = apply_filters( 'tribe_aggregator_queue_class', $class, $record, $items );
return $class instanceof Tribe__Events__Aggregator__Record__Queue_Interface
? $class
: new $class( $record, $items );
}
}