Вход Регистрация
Файл: includes/library/aws/Aws/S3/BatchDelete.php
Строк: 235
<?php
namespace AwsS3;

use 
AwsAwsClientInterface;
use 
AwsS3ExceptionDeleteMultipleObjectsException;
use 
GuzzleHttpPromise;
use 
GuzzleHttpPromisePromisorInterface;
use 
GuzzleHttpPromisePromiseInterface;

/**
 * Efficiently deletes many objects from a single Amazon S3 bucket using an
 * iterator that yields keys. Deletes are made using the DeleteObjects API
 * operation.
 *
 *     $s3 = new AwsS3Client([
 *         'region' => 'us-west-2',
 *         'version' => 'latest'
 *     ]);
 *
 *     $listObjectsParams = ['Bucket' => 'foo', 'Prefix' => 'starts/with/'];
 *     $delete = AwsS3BatchDelete::fromListObjects($s3, $listObjectsParams);
 *     // Asynchronously delete
 *     $promise = $delete->promise();
 *     // Force synchronous completion
 *     $delete->delete();
 *
 * When using one of the batch delete creational static methods, you can supply
 * an associative array of options:
 *
 * - before: Function invoked before executing a command. The function is
 *   passed the command that is about to be executed. This can be useful
 *   for logging, adding custom request headers, etc.
 * - batch_size: The size of each delete batch. Defaults to 1000.
 *
 * @link http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
 */
class BatchDelete implements PromisorInterface
{
    private 
$bucket;
    
/** @var AwsClientInterface */
    
private $client;
    
/** @var callable */
    
private $before;
    
/** @var PromiseInterface */
    
private $cachedPromise;
    
/** @var callable */
    
private $promiseCreator;
    private 
$batchSize 1000;
    private 
$queue = [];

    
/**
     * Creates a BatchDelete object from all of the paginated results of a
     * ListObjects operation. Each result that is returned by the ListObjects
     * operation will be deleted.
     *
     * @param AwsClientInterface $client            AWS Client to use.
     * @param array              $listObjectsParams ListObjects API parameters
     * @param array              $options           BatchDelete options.
     *
     * @return BatchDelete
     */
    
public static function fromListObjects(
        
AwsClientInterface $client,
        array 
$listObjectsParams,
        array 
$options = []
    ) {
        
$iter $client->getPaginator('ListObjects'$listObjectsParams);
        
$bucket $listObjectsParams['Bucket'];
        
$fn = function (BatchDelete $that) use ($iter) {
            return 
$iter->each(function ($result) use ($that) {
                
$promises = [];
                foreach (
$result['Contents'] as $object) {
                    if (
$promise $that->enqueue($object)) {
                        
$promises[] = $promise;
                    }
                }
                return 
$promises Promiseall($promises) : null;
            });
        };

        return new 
self($client$bucket$fn$options);
    }

    
/**
     * Creates a BatchDelete object from an iterator that yields results.
     *
     * @param AwsClientInterface $client  AWS Client to use to execute commands
     * @param string             $bucket  Bucket where the objects are stored
     * @param Iterator          $iter    Iterator that yields assoc arrays
     * @param array              $options BatchDelete options
     *
     * @return BatchDelete
     */
    
public static function fromIterator(
        
AwsClientInterface $client,
        
$bucket,
        
Iterator $iter,
        array 
$options = []
    ) {
        
$fn = function (BatchDelete $that) use ($iter) {
            return 
GuzzleHttpPromisecoroutine(function () use ($that$iter) {
                foreach (
$iter as $obj) {
                    if (
$promise $that->enqueue($obj)) {
                        yield 
$promise;
                    }
                }
            });
        };

        return new 
self($client$bucket$fn$options);
    }

    public function 
promise()
    {
        if (!
$this->cachedPromise) {
            
$this->cachedPromise $this->createPromise();
        }

        return 
$this->cachedPromise;
    }

    
/**
     * Synchronously deletes all of the objects.
     *
     * @throws DeleteMultipleObjectsException on error.
     */
    
public function delete()
    {
        
$this->promise()->wait();
    }

    
/**
     * @param AwsClientInterface $client    Client used to transfer the requests
     * @param string             $bucket    Bucket to delete from.
     * @param callable           $promiseFn Creates a promise.
     * @param array              $options   Hash of options used with the batch
     *
     * @throws InvalidArgumentException if the provided batch_size is <= 0
     */
    
private function __construct(
        
AwsClientInterface $client,
        
$bucket,
        callable 
$promiseFn,
        array 
$options = []
    ) {
        
$this->client $client;
        
$this->bucket $bucket;
        
$this->promiseCreator $promiseFn;

        if (isset(
$options['before'])) {
            if (!
is_callable($options['before'])) {
                throw new 
InvalidArgumentException('before must be callable');
            }
            
$this->before $options['before'];
        }

        if (isset(
$options['batch_size'])) {
            if (
$options['batch_size'] <= 0) {
                throw new 
InvalidArgumentException('batch_size is not > 0');
            }
            
$this->batchSize min($options['batch_size'], 1000);
        }
    }

    private function 
enqueue(array $obj)
    {
        
$this->queue[] = $obj;
        return 
count($this->queue) >= $this->batchSize
            
$this->flushQueue()
            : 
null;
    }

    private function 
flushQueue()
    {
        static 
$validKeys = ['Key' => true'VersionId' => true];

        if (
count($this->queue) === 0) {
            return 
null;
        }

        
$batch = [];
        while (
$obj array_shift($this->queue)) {
            
$batch[] = array_intersect_key($obj$validKeys);
        }

        
$command $this->client->getCommand('DeleteObjects', [
            
'Bucket' => $this->bucket,
            
'Delete' => ['Objects' => $batch]
        ]);

        if (
$this->before) {
            
call_user_func($this->before$command);
        }

        return 
$this->client->executeAsync($command)
            ->
then(function ($result) {
                if (!empty(
$result['Errors'])) {
                    throw new 
DeleteMultipleObjectsException(
                        
$result['Deleted'] ?: [],
                        
$result['Errors']
                    );
                }
                return 
$result;
            });
    }

    
/**
     * Returns a promise that will clean up any references when it completes.
     *
     * @return PromiseInterface
     */
    
private function createPromise()
    {
        
// Create the promise
        
$promise call_user_func($this->promiseCreator$this);
        
$this->promiseCreator null;

        
// Cleans up the promise state and references.
        
$cleanup = function () {
            
$this->before $this->client $this->queue null;
        };

        
// When done, ensure cleanup and that any remaining are processed.
        
return $promise->then(
            function () use (
$cleanup)  {
                return 
Promisepromise_for($this->flushQueue())
                    ->
then($cleanup);
            },
            function (
$reason) use ($cleanup)  {
                
$cleanup();
                return 
Promiserejection_for($reason);
            }
        );
    }
}
Онлайн: 1
Реклама