Collecting data and sending to Apache Kafka

Introduction



To analyze streaming data, you need the sources of this data. The information provided by the sources is also important. And sources with textual information, for example, are also rare.



Interesting sources include the following: twitter , vk . But these sources are not suitable for all tasks.



There are sources with the required data, but these sources are not streaming. The following links can be cited here: public-apis .



You can use the old method when solving problems with streaming data.



Download data and send to stream.



For example, you can use the following source: imdb .

It should be noted that imdb provides data on its own. See IMDb Datasets . But it can be assumed that the data collected directly contains more relevant information.



Language: Java 1.8.

Libraries: kafka 2.6.0, jsoup 1.13.1.



Data collection



Data collection is a service that, according to the input data, loads html pages, searches for the necessary information and transforms it into a set of objects.



: imdb. : https://www.imdb.com/search/title/?release_date=%s,%s&countries=%s

1, 2 โ€“ . 3 โ€“ .



: imdb-extensive-dataset.



:



public interface MovieDirectScrapingService {
    Collection<Movie> scrap();
}


Movie โ€“ , ( ..).



class Movie {
    public final String titleId;
    public final String titleUrl;
    public final String title;
    public final String description;
    public final Double rating;
    public final String genres;
    public final String runtime;
    public final String baseUrl;
    public final String baseNameUrl;
    public final String baseTitleUrl;
    public final String participantIds;
    public final String participantNames;
    public final String directorIds;
    public final String directorNames;
โ€ฆ


.



. jsoup. html- .



String scrap(String url, List<Movie> items) {
    Document doc = null;
    try {
        doc = Jsoup.connect(url).header("Accept-Language", language).get();
    } catch (IOException e) {
        e.printStackTrace();
    }
    if (doc != null) {
        collectItems(doc, items);
        return nextUrl(doc);
    }
    return "";
}


.



String nextUrl(Document doc) {
    Elements nextPageElements = doc.select(".next-page");
    if (nextPageElements.size() > 0) {
        Element hrefElement = nextPageElements.get(0);
        return baseUrl + hrefElement.attributes().get("href");
    }
    return "";
}


. . . , . .



@Override
public Collection<Movie> scrap() {
    String url = String.format(
            baseUrl + "/search/title/?release_date=%s,%s&countries=%s",
            startDate, endDate, countries
    );
    List<Movie> items = new ArrayList<>();
    String nextUrl = url;
    while (true) {
        nextUrl = scrap(nextUrl, items);
        if ("".equals(nextUrl)) {
            break;
        }
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
    return items;
}


.





: MovieProducer. : run.



. . .



public void run() {
    try (SimpleStringStringProducer producer = new SimpleStringStringProducer(
            bootstrapServers, clientId, topic)) {
        Collection<Data.Movie> movies = movieDirectScrapingService.scrap();
        List<SimpleStringStringProducer.KeyValueStringString> kvList = new ArrayList<>();
        for (Data.Movie move : movies) {
            Map<String, String> map = new HashMap<>();
            map.put("title_id", move.titleId);
            map.put("title_url", move.titleUrl);
            โ€ฆ
            String value = JSONObject.toJSONString(map);
            String key = UUID.randomUUID().toString();
            kvList.add(new SimpleStringStringProducer.KeyValueStringString(key, value));
        }
        producer.produce(kvList);
    }
}




. .

: MovieDirectScrapingExecutor. : run.



. .



public void run() {
    int countriesCounter = 0;
    List<String> countriesSource = Arrays.asList("us");

    while (true) {
        try {
            LocalDate localDate = LocalDate.now();

            int year = localDate.getYear();
            int month = localDate.getMonthValue();
            int day = localDate.getDayOfMonth();

            String monthString = month < 9 ? "0" + month : Integer.toString(month);
            String dayString = day < 9 ? "0" + day : Integer.toString(day);

            String startDate = year + "-" + monthString + "-" + dayString;
            String endDate = startDate;

            String language = "en";
            String countries = countriesSource.get(countriesCounter);

            execute(language, startDate, endDate, countries);

            Thread.sleep(1000);

            countriesCounter += 1;
            if (countriesCounter >= countriesSource.size()) {
                countriesCounter = 0;
            }

        } catch (InterruptedException e) {
        }
    }
}


MovieDirectScrapingExecutor, , , main.



.



{
  "base_name_url": "https:\/\/www.imdb.com\/name",
  "participant_ids": "nm7947173~nm2373827~nm0005288~nm0942193~",
  "title_id": "tt13121702",
  "rating": "0.0",
  "base_url": "https:\/\/www.imdb.com",
  "description": "It's Christmas time and Jackie (Carly Hughes), an up-and-coming journalist, finds that her life is at a crossroads until she finds an unexpected opportunity - to run a small-town newspaper ... See full summary ยป",
  "runtime": "",
  "title": "The Christmas Edition",
  "director_ids": "nm0838289~",
  "title_url": "\/title\/tt13121702\/?ref_=adv_li_tt",
  "director_names": "Peter Sullivan~",
  "genres": "Drama, Romance",
  "base_title_url": "https:\/\/www.imdb.com\/title",
  "participant_names": "Carly Hughes~Rob Mayes~Marie Osmond~Aloma Wright~"
}


.





, , -. kafka-.

. Apache Kafka Kafka Server.



: MovieProducerTest.



public class MovieProducerTest {
    @Test
    void simple() throws InterruptedException {
        String brokerHost = "127.0.0.1";
        int brokerPort = 29092;
        String zooKeeperHost = "127.0.0.1";
        int zooKeeperPort = 22183;
        String bootstrapServers = brokerHost + ":" + brokerPort;
        String topic = "q-data";
        String clientId = "simple";
        try (KafkaServerService kafkaServerService = new KafkaServerService(
                brokerHost, brokerPort, zooKeeperHost, zooKeeperPort
        )
        ) {
            kafkaServerService.start();
            kafkaServerService.createTopic(topic);

            MovieDirectScrapingService movieDirectScrapingServiceImpl = () -> Collections.singleton(
                    new Data.Movie(โ€ฆ)
            );
            MovieProducer movieProducer =
                    new MovieProducer(bootstrapServers, clientId, topic, movieDirectScrapingServiceImpl);
            movieProducer.run();

            kafkaServerService.poll(topic, "simple", 1, 5, (records) -> {
                assertTrue(records.count() > 0);
                ConsumerRecord<String, String> record = records.iterator().next();
                JSONParser jsonParser = new JSONParser();
                JSONObject jsonObject = null;
                try {
                    jsonObject = (JSONObject) jsonParser.parse(record.value());
                } catch (ParseException e) {
                    e.printStackTrace();
                }
                assertNotNull(jsonObject);
        โ€ฆ
            });

            Thread.sleep(5000);
        }
    }
}




, , . .





.




All Articles