Вход Регистрация
Файл: vendor/laravel/framework/src/Illuminate/Bus/DatabaseBatchRepository.php
Строк: 456
<?php

namespace IlluminateBus;

use 
CarbonCarbonImmutable;
use 
Closure;
use 
DateTimeInterface;
use 
IlluminateDatabaseConnection;
use 
IlluminateDatabasePostgresConnection;
use 
IlluminateDatabaseQueryExpression;
use 
IlluminateSupportStr;

class 
DatabaseBatchRepository implements PrunableBatchRepository
{
    
/**
     * The batch factory instance.
     *
     * @var IlluminateBusBatchFactory
     */
    
protected $factory;

    
/**
     * The database connection instance.
     *
     * @var IlluminateDatabaseConnection
     */
    
protected $connection;

    
/**
     * The database table to use to store batch information.
     *
     * @var string
     */
    
protected $table;

    
/**
     * Create a new batch repository instance.
     *
     * @param  IlluminateBusBatchFactory  $factory
     * @param  IlluminateDatabaseConnection  $connection
     * @param  string  $table
     */
    
public function __construct(BatchFactory $factoryConnection $connectionstring $table)
    {
        
$this->factory $factory;
        
$this->connection $connection;
        
$this->table $table;
    }

    
/**
     * Retrieve a list of batches.
     *
     * @param  int  $limit
     * @param  mixed  $before
     * @return IlluminateBusBatch[]
     */
    
public function get($limit 50$before null)
    {
        return 
$this->connection->table($this->table)
                            ->
orderByDesc('id')
                            ->
take($limit)
                            ->
when($beforefn ($q) => $q->where('id''<'$before))
                            ->
get()
                            ->
map(function ($batch) {
                                return 
$this->toBatch($batch);
                            })
                            ->
all();
    }

    
/**
     * Retrieve information about an existing batch.
     *
     * @param  string  $batchId
     * @return IlluminateBusBatch|null
     */
    
public function find(string $batchId)
    {
        
$batch $this->connection->table($this->table)
                            ->
useWritePdo()
                            ->
where('id'$batchId)
                            ->
first();

        if (
$batch) {
            return 
$this->toBatch($batch);
        }
    }

    
/**
     * Store a new pending batch.
     *
     * @param  IlluminateBusPendingBatch  $batch
     * @return IlluminateBusBatch
     */
    
public function store(PendingBatch $batch)
    {
        
$id = (string) Str::orderedUuid();

        
$this->connection->table($this->table)->insert([
            
'id' => $id,
            
'name' => $batch->name,
            
'total_jobs' => 0,
            
'pending_jobs' => 0,
            
'failed_jobs' => 0,
            
'failed_job_ids' => '[]',
            
'options' => $this->serialize($batch->options),
            
'created_at' => time(),
            
'cancelled_at' => null,
            
'finished_at' => null,
        ]);

        return 
$this->find($id);
    }

    
/**
     * Increment the total number of jobs within the batch.
     *
     * @param  string  $batchId
     * @param  int  $amount
     * @return void
     */
    
public function incrementTotalJobs(string $batchIdint $amount)
    {
        
$this->connection->table($this->table)->where('id'$batchId)->update([
            
'total_jobs' => new Expression('total_jobs + '.$amount),
            
'pending_jobs' => new Expression('pending_jobs + '.$amount),
            
'finished_at' => null,
        ]);
    }

    
/**
     * Decrement the total number of pending jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return IlluminateBusUpdatedBatchJobCounts
     */
    
public function decrementPendingJobs(string $batchIdstring $jobId)
    {
        
$values $this->updateAtomicValues($batchId, function ($batch) use ($jobId) {
            return [
                
'pending_jobs' => $batch->pending_jobs 1,
                
'failed_jobs' => $batch->failed_jobs,
                
'failed_job_ids' => json_encode(array_values(array_diff(json_decode($batch->failed_job_idstrue), [$jobId]))),
            ];
        });

        return new 
UpdatedBatchJobCounts(
            
$values['pending_jobs'],
            
$values['failed_jobs']
        );
    }

    
/**
     * Increment the total number of failed jobs for the batch.
     *
     * @param  string  $batchId
     * @param  string  $jobId
     * @return IlluminateBusUpdatedBatchJobCounts
     */
    
public function incrementFailedJobs(string $batchIdstring $jobId)
    {
        
$values $this->updateAtomicValues($batchId, function ($batch) use ($jobId) {
            return [
                
'pending_jobs' => $batch->pending_jobs,
                
'failed_jobs' => $batch->failed_jobs 1,
                
'failed_job_ids' => json_encode(array_values(array_unique(array_merge(json_decode($batch->failed_job_idstrue), [$jobId])))),
            ];
        });

        return new 
UpdatedBatchJobCounts(
            
$values['pending_jobs'],
            
$values['failed_jobs']
        );
    }

    
/**
     * Update an atomic value within the batch.
     *
     * @param  string  $batchId
     * @param  Closure  $callback
     * @return int|null
     */
    
protected function updateAtomicValues(string $batchIdClosure $callback)
    {
        return 
$this->connection->transaction(function () use ($batchId$callback) {
            
$batch $this->connection->table($this->table)->where('id'$batchId)
                        ->
lockForUpdate()
                        ->
first();

            return 
is_null($batch) ? [] : tap($callback($batch), function ($values) use ($batchId) {
                
$this->connection->table($this->table)->where('id'$batchId)->update($values);
            });
        });
    }

    
/**
     * Mark the batch that has the given ID as finished.
     *
     * @param  string  $batchId
     * @return void
     */
    
public function markAsFinished(string $batchId)
    {
        
$this->connection->table($this->table)->where('id'$batchId)->update([
            
'finished_at' => time(),
        ]);
    }

    
/**
     * Cancel the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    
public function cancel(string $batchId)
    {
        
$this->connection->table($this->table)->where('id'$batchId)->update([
            
'cancelled_at' => time(),
            
'finished_at' => time(),
        ]);
    }

    
/**
     * Delete the batch that has the given ID.
     *
     * @param  string  $batchId
     * @return void
     */
    
public function delete(string $batchId)
    {
        
$this->connection->table($this->table)->where('id'$batchId)->delete();
    }

    
/**
     * Prune all of the entries older than the given date.
     *
     * @param  DateTimeInterface  $before
     * @return int
     */
    
public function prune(DateTimeInterface $before)
    {
        
$query $this->connection->table($this->table)
            ->
whereNotNull('finished_at')
            ->
where('finished_at''<'$before->getTimestamp());

        
$totalDeleted 0;

        do {
            
$deleted $query->take(1000)->delete();

            
$totalDeleted += $deleted;
        } while (
$deleted !== 0);

        return 
$totalDeleted;
    }

    
/**
     * Prune all of the unfinished entries older than the given date.
     *
     * @param  DateTimeInterface  $before
     * @return int
     */
    
public function pruneUnfinished(DateTimeInterface $before)
    {
        
$query $this->connection->table($this->table)
            ->
whereNull('finished_at')
            ->
where('created_at''<'$before->getTimestamp());

        
$totalDeleted 0;

        do {
            
$deleted $query->take(1000)->delete();

            
$totalDeleted += $deleted;
        } while (
$deleted !== 0);

        return 
$totalDeleted;
    }

    
/**
     * Prune all of the cancelled entries older than the given date.
     *
     * @param  DateTimeInterface  $before
     * @return int
     */
    
public function pruneCancelled(DateTimeInterface $before)
    {
        
$query $this->connection->table($this->table)
            ->
whereNotNull('cancelled_at')
            ->
where('created_at''<'$before->getTimestamp());

        
$totalDeleted 0;

        do {
            
$deleted $query->take(1000)->delete();

            
$totalDeleted += $deleted;
        } while (
$deleted !== 0);

        return 
$totalDeleted;
    }

    
/**
     * Execute the given Closure within a storage specific transaction.
     *
     * @param  Closure  $callback
     * @return mixed
     */
    
public function transaction(Closure $callback)
    {
        return 
$this->connection->transaction(fn () => $callback());
    }

    
/**
     * Serialize the given value.
     *
     * @param  mixed  $value
     * @return string
     */
    
protected function serialize($value)
    {
        
$serialized serialize($value);

        return 
$this->connection instanceof PostgresConnection
            
base64_encode($serialized)
            : 
$serialized;
    }

    
/**
     * Unserialize the given value.
     *
     * @param  string  $serialized
     * @return mixed
     */
    
protected function unserialize($serialized)
    {
        if (
$this->connection instanceof PostgresConnection &&
            ! 
Str::contains($serialized, [':'';'])) {
            
$serialized base64_decode($serialized);
        }

        return 
unserialize($serialized);
    }

    
/**
     * Convert the given raw batch to a Batch object.
     *
     * @param  object  $batch
     * @return IlluminateBusBatch
     */
    
protected function toBatch($batch)
    {
        return 
$this->factory->make(
            
$this,
            
$batch->id,
            
$batch->name,
            (int) 
$batch->total_jobs,
            (int) 
$batch->pending_jobs,
            (int) 
$batch->failed_jobs,
            
json_decode($batch->failed_job_idstrue),
            
$this->unserialize($batch->options),
            
CarbonImmutable::createFromTimestamp($batch->created_at),
            
$batch->cancelled_at CarbonImmutable::createFromTimestamp($batch->cancelled_at) : $batch->cancelled_at,
            
$batch->finished_at CarbonImmutable::createFromTimestamp($batch->finished_at) : $batch->finished_at
        
);
    }

    
/**
     * Get the underlying database connection.
     *
     * @return IlluminateDatabaseConnection
     */
    
public function getConnection()
    {
        return 
$this->connection;
    }

    
/**
     * Set the underlying database connection.
     *
     * @param  IlluminateDatabaseConnection  $connection
     * @return void
     */
    
public function setConnection(Connection $connection)
    {
        
$this->connection $connection;
    }
}
Онлайн: 1
Реклама