DataFlow tutorials

Crédit photo : pxfuel.com

Goal

This tutorial will teach you how to periodically import French cities from an external HTTP API into Ibexa Content with Dataflow and eZ Dataflow.

Install Ibexa Content 3.3

To follow this tutorial, you just need to use the latest version of Ibexa Content 3.3. You can follow this guide to install a new instance on your computer.

Install Dataflow and eZ Dataflow

To install the bundles, use composer

//

$ composer require code-rhapsodie/dataflow-bundle code-rhapsodie/ezdataflow-bundle ^v3.0

If it is not done yet, add those two lines in the app/config/bundles.php  file:

//app/config/bundles.php

CodeRhapsodie\DataflowBundle\CodeRhapsodieDataflowBundle::class => ['all' => true],
CodeRhapsodie\EzDataflowBundle\CodeRhapsodieEzDataflowBundle::class => ['all' => true],​​​​

And import the routing:

//app/config/routes.yaml

_cr.dataflow:
    resource: '@CodeRhapsodieEzDataflowBundle/Resources/config/routing.yaml'

 

Add the City content type

In your Ibexa Content admin UI, add a new content type with these parameters:

Write the reader for Geo API

The reader must return an iterable. In this tutorial, we use a generator.

The reader calls the french government API to download in JSON format the cities list. After JSON decoding, the reader yields each object representing a French city.

Make a new folder Service in src/  and add a new PHP class named GeoClient .

//src/Service/GeoClient.php

<?php

namespace App\Service;


use Symfony\Contracts\HttpClient\HttpClientInterface;

class GeoClient
{
    private $client;

    public function __construct(HttpClientInterface $client)
    {
        $this->client = $client;
    }


    public function read($url): iterable
    {
        $response = $this->client->request('GET', $url);

        $result = json_decode($response->getContent(), true);

        if (false === $result) {
            return [];
        }

        foreach ($result as $row) {
            yield $row;
        }

    }
}
 

Now you need to configure the service like this:

//app/config/services.yaml

services:
    # [...]

    App\Service\GeoClient:
        arguments:
                 - '@Symfony\Contracts\HttpClient\HttpClientInterface'

This tutorial assumes auto-wiring is enabled for all your services.

Define the DataflowType and add the reader

Now, it's time to write the DataflowType class. Add a new folder named DataflowType in the folder src/  and add a new class named FrenchCitiesDataflow

The Dataflow bundle provides an abstract class CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType to help you to write your dataflow type.

So, your class like this:

//src/DataflowType/FrenchCitiesDataflow.php

<?php

namespace App\DataflowType;


use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;

class FrenchCitiesDataflow extends AbstractDataflowType
{

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
    }

    public function getLabel(): string
    {
        return "French cities";
    }

}

The getLabel function returns the Dataflow name to be displayed in the Ibexa Content admin UI.

To define the reader into the dataflow builder, you need to inject it as a dependency in this class constructor.

Now, your class looks like this:

//src/DataflowType/FrenchCitiesDataflow.php

<?php

namespace App\DataflowType;

use App\Service\GeoClient;
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;

class FrenchCitiesDataflow extends AbstractDataflowType
{

    /**
     * @var GeoClient
     */
    private $geoClient;

    public function __construct(
        GeoClient $geoClient
    ) {
        $this->geoClient = $geoClient;
    }

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $builder->setReader($this->geoClient->read($options['url']));

    }

}

Add all dataflow steps

A step is a callable who has one and only one responsibility. For this tutorial, I need two steps:

  1. One filter to reject all data array lacking the keys codesPostaux and  population .
  2. One step to transform the array data into a ContentStructure.

This code shows that filter as an anonymous function:

//src/DataflowType/FrenchCitiesDataflow.php

// [...]

protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $builder->setReader($this->geoClient->read($options['url']))
            // This step is a filter
            ->addStep(function ($data) {
                if (empty($data['codesPostaux']) || empty($data['population'])) {
                    //reject data
                    return false;
                }
                return $data;
            });
    }

// [...]

This code is an anonymous function prepating data and calling the ContentStuctureFactory provided by eZ Dataflow bundle: 

//src/DataflowType/FrenchCitiesDataflow.php

 

//[...]

protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $builder->setReader($this->geoClient->read($options['url']))
            // This step is a filter
            ->addStep(function ($data) {
                if (empty($data['codesPostaux']) || empty($data['population'])) {
                    //reject data
                    return false;
                }
                return $data;
            })
            // This step transform the data in content structure
            ->addStep(function ($data) use ($options) {

                $remoteId = sprintf('french-city-%d', $data['code']);

                unset($data['code']);

                $data['codesPostaux'] = implode(',', $data['codesPostaux']);

                return $this->contentStructureFactory->transform(
                    $data,
                    $remoteId,
                    $options['language'],
                    $options['content_type'],
                    $options['parent_location_id']
                );

            });
    }

//[...]

Define the writer

In your dataflow, you can define more than one writer. All writers must implement this interface CodeRhapsodie\DataflowBundle\DataflowType\Writer
WriterInterface
.

eZ Dataflow bundle provides a generic writer CodeRhapsodie\EzDataflowBundle\Writer\ContentWriter to save content structures as Ibexa contents. This writer is available as a service.

In this tutorial, we use this generic writer:

//src/DataflowType/FrenchCitiesDataflow.php

//[...]

use CodeRhapsodie\EzDataflowBundle\Writer\ContentWriter;

//[...]

class FrenchCitiesDataflow extends AbstractDataflowType
{
    /**
     * @var ContentWriter
     */
    private $contentWriter;

// [...]

    public function __construct(
        GeoClient $geoClient,
        ContentWriter $contentWriter,
        ContentStructureFactory $contentStructureFactory
    ) {
        $this->geoClient = $geoClient;
        $this->contentWriter = $contentWriter;
        $this->contentStructureFactory = $contentStructureFactory;
    }

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {

        //[...]

        $builder->addWriter($this->contentWriter);

    }

}

Define options needed to run

In order to run this dataflow, some options must be defined by the user:

  • the API URL where the reader can download the JSON data.
  • the identifier of the content type used to create new contents.
  • the id of parent location where contents will be created.
  • the language used to save the content

The AbstractDataflowType provides a protected function that lets you define the options using the OptionsResolver.

The completed function looks like this:

//src/DataflowType/FrenchCitiesDataflow.php

    protected function configureOptions(OptionsResolver $optionsResolver): void
    {
        $optionsResolver->setDefaults([
            'url' => null,
            'content_type' => null,
            'parent_location_id' => null,
            'language' => 'eng-GB'
        ]);
        $optionsResolver->setRequired(['url', 'content_type', 'parent_location_id']);
    }

 

Full dataflow

The completed FrenchCitiesDataflow class: 

//src/DataflowType/FrenchCitiesDataflow.php

<?php

namespace App\DataflowType;

use App\Service\GeoClient;
use CodeRhapsodie\DataflowBundle\DataflowType\AbstractDataflowType;
use CodeRhapsodie\DataflowBundle\DataflowType\DataflowBuilder;
use CodeRhapsodie\EzDataflowBundle\Factory\ContentStructureFactory;
use CodeRhapsodie\EzDataflowBundle\Writer\ContentWriter;
use Symfony\Component\OptionsResolver\OptionsResolver;

class FrenchCitiesDataflow extends AbstractDataflowType
{

    /**
     * @var ContentWriter
     */
    private $contentWriter;

    /**
     * @var ContentStructureFactory
     */
    private $contentStructureFactory;

    /**
     * @var GeoClient
     */
    private $geoClient;

    public function __construct(
        GeoClient $geoClient,
        ContentWriter $contentWriter,
        ContentStructureFactory $contentStructureFactory
    ) {
        $this->geoClient = $geoClient;
        $this->contentWriter = $contentWriter;
        $this->contentStructureFactory = $contentStructureFactory;
    }

    protected function buildDataflow(DataflowBuilder $builder, array $options): void
    {
        $builder->setReader($this->geoClient->read($options['url']))
            // This step is a filter
            ->addStep(function ($data) {
                if (empty($data['codesPostaux']) || empty($data['population'])) {
                    //reject data
                    return false;
                }
                return $data;
            })
            // This step transform the data in content structure
            ->addStep(function ($data) use ($options) {

                $remoteId = sprintf('french-city-%d', $data['code']);

                unset($data['code']);

                $data['codesPostaux'] = implode(',', $data['codesPostaux']);

                return $this->contentStructureFactory->transform(
                    $data,
                    $remoteId,
                    $options['language'],
                    $options['content_type'],
                    $options['parent_location_id']
                );

            })
            ->addWriter($this->contentWriter);
    }

    public function getLabel(): string
    {
        return "French cities";
    }

    protected function configureOptions(OptionsResolver $optionsResolver): void
    {
        $optionsResolver->setDefaults([
            'url' => null,
            'content_type' => null,
            'parent_location_id' => null,
            'language' => 'eng-GB'
        ]);
        $optionsResolver->setRequired(['url', 'content_type', 'parent_location_id']);
    }

    public function getAliases(): iterable
    {
        return ['fc'];
    }

}
 

To run a dataflow, use the Symfony command code-rhapsodie:dataflow:execute  with the dataflow type FQCN or one of its aliases. Aliases for dataflow types are defined by overriding the getAliases function.

Check the Symfony configuration

Now, we can check if Symfony has your dataflow type registered as a service.

//

bin/console debug:container --tag coderhapsodie.dataflow.type --show-arguments

The output console looks like this:

If your dataflow is not in the output, check if your service uses Symfony auto-configuration. If you don't use auto-configuration, check if the dataflow type class is correctly configured in your service.yml file with the tag coderhapsodie.dataflow.type .

Update the database

This bundle uses Doctrine DBAL to store Dataflow schedule into the database table (cr_dataflow_scheduled ) and jobs (cr_dataflow_job ).

If you use Doctrine Migration Bundle or Phinx or eZ Migration Bundle or whatever, you can add a new migration with the generated SQL query from this command:

//

php bin/console code-rhapsodie:dataflow:dump-schema

If you have already the tables, you can add a new migration with the generated update SQL query from this command:

//

php bin/console code-rhapsodie:dataflow:dump-schema --update

Add the Symfony command into crontab

To run dataflow when you need, define for each dataflow a first execution date and time and a frequency. This informations are stored into the database.

To actually execute all dataflow job, schedule this command every 5 minutes in your crontab:

//

/path/to/your/project/bin/console code-rhapsodie:dataflow:run-pending

This command checks and executes each dataflow job waiting to be executed according to the frequency.

Define the dataflow schedule

To define a dataflow schedule, you have two ways:

  1. Use Symfony console command code-rhapsodie:dataflow:schedule:add .
  2. Use eZ Platform back-office with a module provided by eZ Dataflow bundle.

Log in in the admin UI, then click on "Admin" on the top, and "eZ Dataflow" just on the line below.

On the new page, click on the "+" button to open the schedule pop in.

Fill in all fields. Keep in mind that the "Options" field must contain valid YAML.

Once all the fields are filled, run the following symfony command to start the execution of your dataflow which has been scheduled:

//

php bin/console code-rhapsodie:dataflow:execute fc '{"url":"https:\/\/geo.api.gouv.fr\/communes?fields=nom,code,codesPostaux,codeDepartement,codeRegion,population&format=json&geometry=centre","content_type":"city","parent_location_id":54}'

Check the import

On eZ Dataflow back-office page, the scheduled dataflow list contains one line. This job will be executed automatically by the Symfony command previously added in the crontab.

On the list displayed into the pop in, click on the rounded back arrow icon to read the job details :

After reading the job details, you can go to the content to view all imported contents. Click on "Content" on the page top, and "Content structure" just on the line below.

Browse to the parent content and read the "sub-items" section:

Conclusion

In this tutorial, you learned how to:

  • Install Dataflow and eZ Dataflow on your Ibexa Content 3.3 web site.
  • Configure Symfony to enable these bundles.
  • Implement one dataflow.
  • Schedule this dataflow.
  • Run all pending jobs in the background with Cron.
  • Check the job result.

PS: This repository contains samples of code from this tutorial