aio api crawler

Hello. I started working on a library for pulling data from different json api. It can also be used to test api.



Apis are described as classes, for example



class Categories(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories"
    params = {"page": range(100), "language": "en"}
    headers = {"User-Agent": get_user_agent}
    results_key = "*.slug"

categories = Categories()


class Posts(JsonEndpoint):
    url = "http://127.0.0.1:8888/categories/{category}/posts"
    params = {"page": range(100), "language": "en"}
    url_params = {"category": categories.iter_results()}
    results_key = "posts"

    async def comments(self, post):
        comments = Comments(
            self.session,
            url_params={"category": post.url.params["category"], "id": post["id"]},
        )
        return [comment async for comment in comments]

posts = Posts()


Params and url_params can contain functions (like here get_user_agent - returns a random useragent), range, iterators, awaitable and asynchronous iterators (so you can link them together).



The parameters headers and cookies can also contain functions and awaitable.



The category api in the example above returns an array of objects that have a slug, the iterator will return exactly those. By slipping this iterator into the url_params of posts, the iterator will recursively iterate over all categories and all pages in each. It will abort when it encounters a 404 or some other error and moves on to the next category.



And the repositories have an example of aiohttp server for these classes so that everything can be tested.



In addition to get parameters, you can pass them as data or json and set another method.



results_key is dotted and will try to pull keys from the results. For example "comments. *. Text" will return the text of each comment from the array inside comments.



The results are wrapped in a wrapper that has url and params properties. url is derived from a string that also has params. Thus, you can find out what parameters were used to obtain this result. This is demonstrated in the comments method.



There is also a base Sink class for handling the results. For example, folding them into mq or a database. It works in separate tasks and receives data via asyncio.Queue.



class LoggingSink(Sink):
    def transform(self, obj):
        return repr(obj)

    async def init(self):
        from loguru import logger

        self.logger = logger

    async def process(self, obj):
        self.logger.info(obj)
        return True

sink = LoggingSink(num_tasks=1)


An example of the simplest Sink. The transform method allows us to perform some manipulations with the object and return None if it doesn't suit us. those. in themes you can also do validation.



Sink is an asynchronous contextmanager, which, when exited, in theory, will wait until all objects in the queue are processed, then cancel its tasks.



And finally, to tie it all together, I made a Worker class. It accepts one endpoint and several sinks. For instance,



worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink])
worker.run()


run will run the asyncio.run_until_complete for the worker pipeline. It also has a transform method.



There is also a WorkerGroup class that allows you to create several workers at once and make an asyncio.gather for them.



The code contains an example of a server that generates data through faker and handlers for its endpoints. I think this is the most obvious.



All this is at an early stage of development and so far I have often changed api. But now it seems to have come to how it should look. I'll just merge requests and comments to my code.



All Articles