In this article, I'll show you how to build a GraphQL server using Rust and its ecosystem; examples of the implementation of the most common tasks in the development of the GraphQL API will be given. As a result, the APIs of the three microservices will be combined into a single access point using Apollo Server and Apollo Federation . This will allow clients to request data from multiple sources at the same time without having to know which data is coming from which service.
Introduction
Overview
In terms of functionality, the project described is quite similar to the one presented in my previous article , but this time using the Rust stack. Architecturally, the project looks like this:
Each component of the architecture sheds light on several issues that can arise when implementing the GraphQL API. The domain model includes data on the planets of the solar system and their satellites. The project has a multi-module structure (or mono-repository) and consists of the following modules:
planets-service (Rust)
satellites-service (Rust)
auth-service (Rust)
apollo-server (JS)
GraphQL Rust: Juniper Async-graphql, Apollo Federation, ( Federation Juniper). code-first .
PostgreSQL โ , JWT โ Kafka โ .
, :
|
|
|
GitHub |
---|---|---|---|
|
Rust |
|
|
GraphQL |
Async-graphql |
|
|
GraphQL |
Apollo Server |
|
|
Web |
actix-web |
|
|
|
PostgreSQL |
|
|
|
Apache Kafka |
|
|
|
Docker Compose |
|
|
Rust :
|
|
|
GitHub |
---|---|---|---|
ORM |
Diesel |
|
|
Kafka |
rust-rdkafka |
|
|
|
argonautica |
|
|
JWT |
jsonwebtoken |
|
|
|
Testcontainers-rs |
|
|
, Docker Compose. :
-
Diesel CLI (
cargo install diesel_cli --no-default-features --features postgres
)
LLVM (
argonautica
)
CMake (
rust-rdkafka
)
-
-
-
Cargo.toml
:
Root Cargo.toml
[workspace]
members = [
"auth-service",
"planets-service",
"satellites-service",
"common-utils",
]
planets-service
.
Cargo.toml
:
[package]
name = "planets-service"
version = "0.1.0"
edition = "2018"
[dependencies]
common-utils = { path = "../common-utils" }
async-graphql = "2.4.3"
async-graphql-actix-web = "2.4.3"
actix-web = "3.3.2"
actix-rt = "1.1.1"
actix-web-actors = "3.0.0"
futures = "0.3.8"
async-trait = "0.1.42"
bigdecimal = { version = "0.1.2", features = ["serde"] }
serde = { version = "1.0.118", features = ["derive"] }
serde_json = "1.0.60"
diesel = { version = "1.4.5", features = ["postgres", "r2d2", "numeric"] }
diesel_migrations = "1.4.0"
dotenv = "0.15.0"
strum = "0.20.0"
strum_macros = "0.20.1"
rdkafka = { version = "0.24.0", features = ["cmake-build"] }
async-stream = "0.3.0"
lazy_static = "1.4.0"
[dev-dependencies]
jsonpath_lib = "0.2.6"
testcontainers = "0.9.1"
async-graphql
โ GraphQL , actix-web
โ web , async-graphql-actix-web
.
main.rs
:
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
dotenv().ok();
let pool = create_connection_pool();
run_migrations(&pool);
let schema = create_schema_with_context(pool);
HttpServer::new(move || App::new()
.configure(configure_service)
.data(schema.clone())
)
.bind("0.0.0.0:8001")?
.run()
.await
}
HTTP , lib.rs
:
pub fn configure_service(cfg: &mut web::ServiceConfig) {
cfg
.service(web::resource("/")
.route(web::post().to(index))
.route(web::get().guard(guard::Header("upgrade", "websocket")).to(index_ws))
.route(web::get().to(index_playground))
);
}
async fn index(schema: web::Data, http_req: HttpRequest, req: Request) -> Response {
let mut query = req.into_inner();
let maybe_role = common_utils::get_role(http_req);
if let Some(role) = maybe_role {
query = query.data(role);
}
schema.execute(query).await.into()
}
async fn index_ws(schema: web::Data, req: HttpRequest, payload: web::Payload) -> Result {
WSSubscription::start(Schema::clone(&*schema), &req, payload)
}
async fn index_playground() -> HttpResponse {
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body(playground_source(GraphQLPlaygroundConfig::new("/").subscription_endpoint("/")))
}
pub fn create_schema_with_context(pool: PgPool) -> Schema {
let arc_pool = Arc::new(pool);
let cloned_pool = Arc::clone(&arc_pool);
let details_batch_loader = Loader::new(DetailsBatchLoader {
pool: cloned_pool
}).with_max_batch_size(10);
let kafka_consumer_counter = Mutex::new(0);
Schema::build(Query, Mutation, Subscription)
.data(arc_pool)
.data(details_batch_loader)
.data(kafka::create_producer())
.data(kafka_consumer_counter)
.finish()
}
:
index
โ GraphQL (query)
-
index_playground
โ Playground GraphQL IDE
create_schema_with_context
โ GraphQL , ,
GraphQL
:
#[Object]
impl Query {
async fn get_planets(&self, ctx: &Context<'_>) -> Vec {
repository::get_all(&get_conn_from_ctx(ctx)).expect("Can't get planets")
.iter()
.map(|p| { Planet::from(p) })
.collect()
}
async fn get_planet(&self, ctx: &Context<'_>, id: ID) -> Option {
find_planet_by_id_internal(ctx, id)
}
#[graphql(entity)]
async fn find_planet_by_id(&self, ctx: &Context<'_>, id: ID) -> Option {
find_planet_by_id_internal(ctx, id)
}
}
fn find_planet_by_id_internal(ctx: &Context<'_>, id: ID) -> Option {
let id = id.to_string().parse::().expect("Can't get id from String");
repository::get(id, &get_conn_from_ctx(ctx)).ok()
.map(|p| { Planet::from(&p) })
}
. GraphQL DTO ( ). get_planets
get_planet
GraphQL IDE :
{
getPlanets {
name
type
}
}
Planet
:
#[derive(Serialize, Deserialize)]
struct Planet {
id: ID,
name: String,
planet_type: PlanetType,
}
#[Object]
impl Planet {
async fn id(&self) -> &ID {
&self.id
}
async fn name(&self) -> &String {
&self.name
}
/// From an astronomical point of view
#[graphql(name = "type")]
async fn planet_type(&self) -> &PlanetType {
&self.planet_type
}
#[graphql(deprecation = "Now it is not in doubt. Do not use this field")]
async fn is_rotating_around_sun(&self) -> bool {
true
}
async fn details(&self, ctx: &Context<'_>) -> Details {
let loader = ctx.data::>().expect("Can't get loader");
let planet_id = self.id.to_string().parse::().expect("Can't convert id");
loader.load(planet_id).await
}
}
impl
. ( Rust ) deprecation reason. GraphQL IDE.
N+1
Planet.details
N+1, , :
GraphQL
{
getPlanets {
name
details {
meanRadius
}
}
}
details
SQL , . . Details
โ Planet
.
DataLoader, Async-graphql, details
:
async fn details(&self, ctx: &Context<'_>) -> Result {
let data_loader = ctx.data::>().expect("Can't get data loader");
let planet_id = self.id.to_string().parse::().expect("Can't convert id");
let details = data_loader.load_one(planet_id).await?;
details.ok_or_else(|| "Not found".into())
}
data_loader
โ , :
let details_data_loader = DataLoader::new(DetailsLoader {
pool: cloned_pool
}).max_batch_size(10);
DetailsLoader
:
pub struct DetailsLoader {
pub pool: Arc
}
#[async_trait::async_trait]
impl Loader for DetailsLoader {
type Value = Details;
type Error = Error;
async fn load(&self, keys: &[i32]) -> Result, Self::Error> {
let conn = self.pool.get().expect("Can't get DB connection");
let details = repository::get_details(keys, &conn).expect("Can't get planets' details");
Ok(details.iter()
.map(|details_entity| (details_entity.planet_id, Details::from(details_entity)))
.collect::>())
}
}
N+1, . . DetailsLoader.load
SQL , DetailsEntity
.
GraphQL :
#[derive(Interface, Clone)]
#[graphql(
field(name = "mean_radius", type = "&CustomBigDecimal"),
field(name = "mass", type = "&CustomBigInt"),
)]
pub enum Details {
InhabitedPlanetDetails(InhabitedPlanetDetails),
UninhabitedPlanetDetails(UninhabitedPlanetDetails),
}
#[derive(SimpleObject, Clone)]
pub struct InhabitedPlanetDetails {
mean_radius: CustomBigDecimal,
mass: CustomBigInt,
/// In billions
population: CustomBigDecimal,
}
#[derive(SimpleObject, Clone)]
pub struct UninhabitedPlanetDetails {
mean_radius: CustomBigDecimal,
mass: CustomBigInt,
}
, "" , SimpleObject
.
. ; (. . - orphan rule). :
#[derive(Clone)]
pub struct CustomBigInt(BigDecimal);
#[Scalar(name = "BigInt")]
impl ScalarType for CustomBigInt {
fn parse(value: Value) -> InputValueResult {
match value {
Value::String(s) => {
let parsed_value = BigDecimal::from_str(&s)?;
Ok(CustomBigInt(parsed_value))
}
_ => Err(InputValueError::expected_type(value)),
}
}
fn to_value(&self) -> Value {
Value::String(format!("{:e}", &self))
}
}
impl LowerExp for CustomBigInt {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let val = &self.0.to_f64().expect("Can't convert BigDecimal");
LowerExp::fmt(val, f)
}
}
#[derive(Clone)]
pub struct CustomBigDecimal(BigDecimal);
#[Scalar(name = "BigDecimal")]
impl ScalarType for CustomBigDecimal {
fn parse(value: Value) -> InputValueResult {
match value {
Value::String(s) => {
let parsed_value = BigDecimal::from_str(&s)?;
Ok(CustomBigDecimal(parsed_value))
}
_ => Err(InputValueError::expected_type(value)),
}
}
fn to_value(&self) -> Value {
Value::String(self.0.to_string())
}
}
, .
:
pub struct Mutation;
#[Object]
impl Mutation {
#[graphql(guard(RoleGuard(role = "Role::Admin")))]
async fn create_planet(&self, ctx: &Context<'_>, planet: PlanetInput) -> Result {
let new_planet = NewPlanetEntity {
name: planet.name,
planet_type: planet.planet_type.to_string(),
};
let details = planet.details;
let new_planet_details = NewDetailsEntity {
mean_radius: details.mean_radius.0,
mass: BigDecimal::from_str(&details.mass.0.to_string()).expect("Can't get BigDecimal from string"),
population: details.population.map(|wrapper| { wrapper.0 }),
planet_id: 0,
};
let created_planet_entity = repository::create(new_planet, new_planet_details, &get_conn_from_ctx(ctx))?;
let producer = ctx.data::().expect("Can't get Kafka producer");
let message = serde_json::to_string(&Planet::from(&created_planet_entity)).expect("Can't serialize a planet");
kafka::send_message(producer, message).await;
Ok(Planet::from(&created_planet_entity))
}
}
, :
#[derive(InputObject)]
struct PlanetInput {
name: String,
#[graphql(name = "type")]
planet_type: PlanetType,
details: DetailsInput,
}
RoleGuard
', Admin
. , , , :
mutation {
createPlanet(
planet: {
name: "test_planet"
type: TERRESTRIAL_PLANET
details: { meanRadius: "10.5", mass: "8.8e24", population: "0.5" }
}
) {
id
}
}
Authorization
JWT, auth-service
( ).
:
let producer = ctx.data::().expect("Can't get Kafka producer");
let message = serde_json::to_string(&Planet::from(&created_planet_entity)).expect("Can't serialize a planet");
kafka::send_message(producer, message).await;
API , Kafka consumer:
pub struct Subscription;
#[Subscription]
impl Subscription {
async fn latest_planet<'ctx>(&self, ctx: &'ctx Context<'_>) -> impl Stream + 'ctx {
let kafka_consumer_counter = ctx.data::>().expect("Can't get Kafka consumer counter");
let consumer_group_id = kafka::get_kafka_consumer_group_id(kafka_consumer_counter);
let consumer = kafka::create_consumer(consumer_group_id);
async_stream::stream! {
let mut stream = consumer.start();
while let Some(value) = stream.next().await {
yield match value {
Ok(message) => {
let payload = message.payload().expect("Kafka message should contain payload");
let message = String::from_utf8_lossy(payload).to_string();
serde_json::from_str(&message).expect("Can't deserialize a planet")
}
Err(e) => panic!("Error while Kafka message processing: {}", e)
};
}
}
}
}
, :
subscription {
latestPlanet {
id
name
type
details {
meanRadius
}
}
}
ws://localhost:8001
.
:
#[actix_rt::test]
async fn test_get_planets() {
let docker = Cli::default();
let (_pg_container, pool) = common::setup(&docker);
let mut service = test::init_service(App::new()
.configure(configure_service)
.data(create_schema_with_context(pool))
).await;
let query = "
{
getPlanets {
id
name
type
details {
meanRadius
mass
... on InhabitedPlanetDetails {
population
}
}
}
}
".to_string();
let request_body = GraphQLCustomRequest {
query,
variables: Map::new(),
};
let request = test::TestRequest::post().uri("/").set_json(&request_body).to_request();
let response: GraphQLCustomResponse = test::read_response_json(&mut service, request).await;
fn get_planet_as_json(all_planets: &serde_json::Value, index: i32) -> &serde_json::Value {
jsonpath::select(all_planets, &format!("$.getPlanets[{}]", index)).expect("Can't get planet by JSON path")[0]
}
let mercury_json = get_planet_as_json(&response.data, 0);
common::check_planet(mercury_json, 1, "Mercury", "TERRESTRIAL_PLANET", "2439.7");
let earth_json = get_planet_as_json(&response.data, 2);
common::check_planet(earth_json, 3, "Earth", "TERRESTRIAL_PLANET", "6371.0");
let neptune_json = get_planet_as_json(&response.data, 7);
common::check_planet(neptune_json, 8, "Neptune", "ICE_GIANT", "24622.0");
}
const PLANET_FRAGMENT: &str = "
fragment planetFragment on Planet {
id
name
type
details {
meanRadius
mass
... on InhabitedPlanetDetails {
population
}
}
}
";
#[actix_rt::test]
async fn test_get_planet_by_id() {
...
let query = "
{
getPlanet(id: 3) {
... planetFragment
}
}
".to_string() + PLANET_FRAGMENT;
let request_body = GraphQLCustomRequest {
query,
variables: Map::new(),
};
...
}
#[actix_rt::test]
async fn test_get_planet_by_id_with_variable() {
...
let query = "
query testPlanetById($planetId: String!) {
getPlanet(id: $planetId) {
... planetFragment
}
}".to_string() + PLANET_FRAGMENT;
let jupiter_id = 5;
let mut variables = Map::new();
variables.insert("planetId".to_string(), jupiter_id.into());
let request_body = GraphQLCustomRequest {
query,
variables,
};
...
}
Testcontainers-rs
, , , PostgreSQL.
GraphQL API
GraphQL API. , , graphql-client, .
API
Satellite
planet
, :
{
getPlanet(id: "1") {
satellites {
planet {
satellites {
planet {
satellites {
... # more deep nesting!
}
}
}
}
}
}
}
:
pub fn create_schema_with_context(pool: PgPool) -> Schema {
...
Schema::build(Query, Mutation, Subscription)
.limit_depth(3)
.limit_complexity(15)
...
}
, GraphQL IDE. , IDE introspection query, .
auth-service
argonautica
jsonwebtoken
. Argon2. ; , .
:
pub struct Mutation;
#[Object]
impl Mutation {
async fn sign_in(&self, ctx: &Context<'_>, input: SignInInput) -> Result {
let maybe_user = repository::get_user(&input.username, &get_conn_from_ctx(ctx)).ok();
if let Some(user) = maybe_user {
if let Ok(matching) = verify_password(&user.hash, &input.password) {
if matching {
let role = AuthRole::from_str(user.role.as_str()).expect("Can't convert &str to AuthRole");
return Ok(common_utils::create_token(user.username, role));
}
}
}
Err(Error::new("Can't authenticate a user"))
}
}
#[derive(InputObject)]
struct SignInInput {
username: String,
password: String,
}
verify_password
utils
, create_token
common_utils
. , sign_in
JWT, .
JWT :
JWT
mutation {
signIn(input: { username: "john_doe", password: "password" })
}
john_doe/password. JWT (. ).
, HTTP Authorization: Bearer $JWT
. index
HTTP GraphQL /:
async fn index(schema: web::Data, http_req: HttpRequest, req: Request) -> Response {
let mut query = req.into_inner();
let maybe_role = common_utils::get_role(http_req);
if let Some(role) = maybe_role {
query = query.data(role);
}
schema.execute(query).await.into()
}
create_planet
:
#[graphql(guard(RoleGuard(role = "Role::Admin")))]
:
struct RoleGuard {
role: Role,
}
#[async_trait::async_trait]
impl Guard for RoleGuard {
async fn check(&self, ctx: &Context<'_>) -> Result<()> {
if ctx.data_opt::() == Some(&self.role) {
Ok(())
} else {
Err("Forbidden".into())
}
}
}
, , "Forbidden".
GraphQL :
#[derive(SimpleObject)]
struct Satellite {
...
life_exists: LifeExists,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug, Enum, EnumString)]
#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]
pub enum LifeExists {
Yes,
OpenQuestion,
NoData,
}
Async-graphql / chrono
, :
#[derive(SimpleObject)]
struct Satellite {
...
first_spacecraft_landing_date: Option,
}
Apollo Federation
satellites-service
โ GraphQL (Planet
) ( ) Apollo Server.
Planet
planets-service
:
#[derive(Serialize, Deserialize)]
struct Planet {
id: ID,
name: String,
planet_type: PlanetType,
}
planets-service
Planet
:
#[Object]
impl Query {
#[graphql(entity)]
async fn find_planet_by_id(&self, ctx: &Context<'_>, id: ID) -> Option {
find_planet_by_id_internal(ctx, id)
}
}
satellites-service
Planet
satellites
:
struct Planet {
id: ID
}
#[Object(extends)]
impl Planet {
#[graphql(external)]
async fn id(&self) -> &ID {
&self.id
}
async fn satellites(&self, ctx: &Context<'_>) -> Vec {
let id = self.id.to_string().parse::().expect("Can't get id from String");
repository::get_by_planet_id(id, &get_conn_from_ctx(ctx)).expect("Can't get satellites of planet")
.iter()
.map(|e| { Satellite::from(e) })
.collect()
}
}
. Planet
:
#[Object]
impl Query {
#[graphql(entity)]
async fn get_planet_by_id(&self, id: ID) -> Planet {
Planet { id }
}
}
Async-graphql (_service
and _entities
), Apollo Server'. โ , API Apollo Server'. , Apollo Federation - .
Apollo Server
Apollo Server Apollo Federation :
GraphQL API
, , frontend , .
GraphQL , schema stitching, .
:
{
"name": "api-gateway",
"main": "gateway.js",
"scripts": {
"start-gateway": "nodemon gateway.js"
},
"devDependencies": {
"concurrently": "5.3.0",
"nodemon": "2.0.6"
},
"dependencies": {
"@apollo/gateway": "0.21.3",
"apollo-server": "2.19.0",
"graphql": "15.4.0"
}
}
const {ApolloServer} = require("apollo-server");
const {ApolloGateway, RemoteGraphQLDataSource} = require("@apollo/gateway");
class AuthenticatedDataSource extends RemoteGraphQLDataSource {
willSendRequest({request, context}) {
if (context.authHeaderValue) {
request.http.headers.set('Authorization', context.authHeaderValue);
}
}
}
let node_env = process.env.NODE_ENV;
function get_service_url(service_name, port) {
let host;
switch (node_env) {
case 'docker':
host = service_name;
break;
case 'local': {
host = 'localhost';
break
}
}
return "http://" + host + ":" + port;
}
const gateway = new ApolloGateway({
serviceList: [
{name: "planets-service", url: get_service_url("planets-service", 8001)},
{name: "satellites-service", url: get_service_url("satellites-service", 8002)},
{name: "auth-service", url: get_service_url("auth-service", 8003)},
],
buildService({name, url}) {
return new AuthenticatedDataSource({url});
},
});
const server = new ApolloServer({
gateway, subscriptions: false, context: ({req}) => ({
authHeaderValue: req.headers.authorization
})
});
server.listen({host: "0.0.0.0", port: 4000}).then(({url}) => {
console.log(`? Server ready at ${url}`);
});
, .
apollo-service
, Rust ( Authorization
).
, , Apollo Server, Federation; , .
:
PostgreSQL and Diesel. Docker , diesel setup
, . , , .
API
, :
Docker Compose (docker-compose.yml)
, , :
( )
docker-compose up
production mode ( )
docker-compose -f docker-compose.yml up
Docker
Rust
cargo run
, Apollo Server:
cd
apollo-server
NODE_ENV
, ,set NODE_ENV=local
( Windows)
npm install
npm run start-gateway
apollo-server
:
Apollo Server
[nodemon] 2.0.6
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node gateway.js`
Server ready at http://0.0.0.0:4000/
http://localhost:4000
Playground IDE:
, , . , Playground IDE.
, , GraphQL IDE; :
subscription {
latestPlanet {
name
type
}
}
Authorization
:
mutation {
createPlanet(
planet: {
name: "Pluto"
type: DWARF_PLANET
details: { meanRadius: "1188", mass: "1.303e22" }
}
) {
id
}
}
:
CI/CD
CI/CD GitHub Actions (workflow), , Docker Google Cloud Platform.
: "" , .
, GraphQL API Rust. API Rust GraphQL GraphQL ; . Apollo Server, Apollo Federation Async-graphql. GitHub. , . !