> | string $rows * @param array $columnMap * @param array $options */ public function __construct( protected Import $import, protected array | string $rows, protected array $columnMap, protected array $options = [], ) { $this->importer = $this->import->getImporter( $this->columnMap, $this->options, ); } /** * @return array */ public function middleware(): array { return $this->importer->getJobMiddleware(); } public function handle(): void { /** @var Authenticatable $user */ $user = $this->import->user; if (method_exists(auth()->guard(), 'login')) { auth()->login($user); } else { auth()->setUser($user); } $exceptions = []; $processedRows = 0; $successfulRows = 0; if (! is_array($this->rows)) { $rows = unserialize(base64_decode($this->rows)); } foreach (($rows ?? $this->rows) as $row) { $row = $this->utf8Encode($row); try { DB::transaction(fn () => ($this->importer)($row)); $successfulRows++; } catch (RowImportFailedException $exception) { $this->logFailedRow($row, $exception->getMessage()); } catch (ValidationException $exception) { $this->logFailedRow($row, collect($exception->errors())->flatten()->implode(' ')); } catch (Throwable $exception) { $exceptions[$exception::class] = $exception; $this->logFailedRow($row); } $processedRows++; } $this->import::query() ->whereKey($this->import) ->update([ 'processed_rows' => DB::raw('processed_rows + ' . $processedRows), 'successful_rows' => DB::raw('successful_rows + ' . $successfulRows), ]); $this->import::query() ->whereKey($this->import) ->whereColumn('processed_rows', '>', 'total_rows') ->update([ 'processed_rows' => DB::raw('total_rows'), ]); $this->import::query() ->whereKey($this->import) ->whereColumn('successful_rows', '>', 'total_rows') ->update([ 'successful_rows' => DB::raw('total_rows'), ]); $this->import->refresh(); event(new ImportChunkProcessed( $this->import, $this->columnMap, $this->options, $processedRows, $successfulRows, $exceptions, )); $this->handleExceptions($exceptions); } public function retryUntil(): ?CarbonInterface { return $this->importer->getJobRetryUntil(); } /** * @return array */ public function tags(): array { return $this->importer->getJobTags(); } /** * @param array $data */ protected function logFailedRow(array $data, ?string $validationError = null): void { $failedRow = app(FailedImportRow::class); $failedRow->import()->associate($this->import); $failedRow->data = $this->filterSensitiveData($data); $failedRow->validation_error = $validationError; $failedRow->save(); } /** * @param array $data * @return array */ protected function filterSensitiveData(array $data): array { return array_reduce( $this->importer->getColumns(), function (array $carry, ImportColumn $column): array { if (! $column->isSensitive()) { return $carry; } $csvHeader = $this->columnMap[$column->getName()] ?? null; if (blank($csvHeader)) { return $carry; } if (! array_key_exists($csvHeader, $carry)) { return $carry; } unset($carry[$csvHeader]); return $carry; }, initial: $data, ); } protected function utf8Encode(mixed $value): mixed { if (is_array($value)) { return array_map($this->utf8Encode(...), $value); } if (is_string($value)) { return mb_convert_encoding($value, 'UTF-8', 'UTF-8'); } return $value; } /** * @param array $exceptions */ protected function handleExceptions(array $exceptions): void { if (empty($exceptions)) { return; } if (count($exceptions) > 1) { throw new Exception('Multiple types of exceptions occurred: [' . implode('], [', array_keys($exceptions)) . ']'); } throw Arr::first($exceptions); } }