Skip to content

Commit

Permalink
refactor github fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
yujonglee committed Sep 18, 2024
1 parent 019d53e commit c75e70b
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 198 deletions.
127 changes: 34 additions & 93 deletions core/lib/canary/sources/github_discussion_fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,117 +32,58 @@ defmodule Canary.Sources.GithubDiscussion.Fetcher do

alias Canary.Sources.Source
alias Canary.Sources.GithubDiscussion

defp client() do
Canary.graphql_client(
url: "https://api.github.com/graphql",
auth: {:bearer, System.get_env("GITHUB_API_KEY")}
)
end
alias Canary.Sources.GithubFetcher

def run(%Source{
config: %Ash.Union{type: :github_discussion, value: %GithubDiscussion.Config{} = config}
}) do
{:ok, fetch_all(config.owner, config.repo)}
end

def fetch_all(owner, repo) do
Stream.unfold(nil, fn
:stop ->
nil

cursor ->
case fetch_page(owner, repo, cursor) do
{:ok, data} ->
page_info = data["repository"]["discussions"]["pageInfo"]
nodes = data["repository"]["discussions"]["nodes"]

if page_info["hasNextPage"] do
{nodes, page_info["endCursor"]}
else
{nodes, :stop}
end

{:try_after_s, seconds} ->
Process.sleep(seconds * 1000)
{[], cursor}

{:error, _} ->
{[], :stop}
end
end)
|> Stream.flat_map(fn nodes -> Enum.map(nodes, &transform_discussion_node/1) end)
|> Enum.to_list()
end

def fetch_page(owner, repo, cursor) do
result =
client()
|> Req.post(
graphql:
{"""
query ($owner: String!, $repo: String!, $discussion_n: Int!, $comment_n: Int!, $cursor: String) {
repository(owner: $owner, name: $repo) {
discussions(first: $discussion_n, orderBy: {field: UPDATED_AT, direction: DESC}, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
query = """
query ($owner: String!, $repo: String!, $discussion_n: Int!, $comment_n: Int!, $cursor: String) {
repository(owner: $owner, name: $repo) {
discussions(first: $discussion_n, orderBy: {field: UPDATED_AT, direction: DESC}, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
nodes {
id
url
author {
login
avatarUrl
}
upvoteCount
title
body
closed
isAnswered
createdAt
comments(last: $comment_n) {
nodes {
id
url
author {
login
avatarUrl
}
title
body
closed
isAnswered
createdAt
comments(last: $comment_n) {
nodes {
id
url
author {
login
avatarUrl
}
body
}
}
}
}
}
}
""",
%{
discussion_n: @default_discussion_n,
comment_n: @default_comment_n,
repo: repo,
owner: owner,
cursor: cursor
}}
)

case result do
{:ok, %{status: 200, body: %{"data" => data}}} ->
{:ok, data}

# https://docs.github.com/en/graphql/overview/rate-limits-and-node-limits-for-the-graphql-api#exceeding-the-rate-limit
{:ok, %{status: 403, headers: headers}} ->
if headers["x-ratelimit-remaining"] == "0" and length(headers["x-ratelimit-reset"]) == 1 do
{:try_after_s, String.to_integer(Enum.at(headers["x-ratelimit-reset"], 0))}
else
{:try_after_s, 60}
end
}
}
"""

{:ok, %{status: 200, body: %{"errors" => errors}}} ->
{:error, errors}
variables = %{
owner: config.owner,
repo: config.repo,
discussion_n: @default_discussion_n,
comment_n: @default_comment_n
}

{:error, error} ->
{:error, error}
end
nodes = GithubFetcher.run_all(query, variables)
{:ok, Enum.map(nodes, &transform_discussion_node/1)}
end

defp transform_discussion_node(discussion) do
Expand Down
67 changes: 67 additions & 0 deletions core/lib/canary/sources/github_fetcher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Canary.Sources.GithubFetcher do
defp client() do
Canary.graphql_client(
url: "https://api.github.com/graphql",
auth: {:bearer, System.get_env("GITHUB_API_KEY")}
)
end

def run_all(query, variables) do
Stream.unfold(nil, fn
:stop ->
nil

cursor ->
case run(query, Map.put(variables, :cursor, cursor)) do
{:ok, data} ->
resource = get_resource(data)
page_info = data["repository"][resource]["pageInfo"]
nodes = data["repository"][resource]["nodes"]

if page_info["hasNextPage"] do
{nodes, page_info["endCursor"]}
else
{nodes, :stop}
end

{:try_after_s, seconds} ->
Process.sleep(seconds * 1000)
{[], cursor}

{:error, errors} ->
IO.inspect(errors)
{[], :stop}
end
end)
|> Stream.flat_map(& &1)
|> Enum.to_list()
end

def run(query, variables) do
case client() |> Req.post(graphql: {query, variables}) do
{:ok, %{status: 200, body: %{"data" => data}}} ->
{:ok, data}

{:ok, %{status: 403, headers: headers}} ->
if headers["x-ratelimit-remaining"] == "0" and length(headers["x-ratelimit-reset"]) == 1 do
{:try_after_s, String.to_integer(Enum.at(headers["x-ratelimit-reset"], 0))}
else
{:try_after_s, 60}
end

{:ok, %{status: 200, body: %{"errors" => errors}}} ->
{:error, errors}

{:error, error} ->
{:error, error}
end
end

defp get_resource(data) do
cond do
Map.has_key?(data["repository"], "issues") -> "issues"
Map.has_key?(data["repository"], "discussions") -> "discussions"
true -> raise "Unknown resource"
end
end
end
150 changes: 45 additions & 105 deletions core/lib/canary/sources/github_issue_fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,114 +30,54 @@ defmodule Canary.Sources.GithubIssue.Fetcher do

alias Canary.Sources.Source
alias Canary.Sources.GithubIssue

defp client() do
Canary.graphql_client(
url: "https://api.github.com/graphql",
auth: {:bearer, System.get_env("GITHUB_API_KEY")}
)
end
alias Canary.Sources.GithubFetcher

def run(%Source{config: %Ash.Union{type: :github_issue, value: %GithubIssue.Config{} = config}}) do
{:ok, fetch_all(config.owner, config.repo)}
end

defp fetch_all(owner, repo) do
Stream.unfold(nil, fn
:stop ->
nil

cursor ->
case fetch_page(owner, repo, cursor) do
{:ok, data} ->
page_info = data["repository"]["issues"]["pageInfo"]
nodes = data["repository"]["issues"]["nodes"]

if page_info["hasNextPage"] do
{nodes, page_info["endCursor"]}
else
{nodes, :stop}
end

{:try_after_s, seconds} ->
Process.sleep(seconds * 1000)
{[], cursor}

{:error, _} ->
{[], :stop}
end
end)
|> Stream.flat_map(fn nodes -> Enum.map(nodes, &transform_issue_node/1) end)
|> Enum.to_list()
end

defp fetch_page(owner, repo, cursor) do
result =
client()
|> Req.post(
graphql:
{"""
query ($owner: String!, $repo: String!, $issue_n: Int!, $comment_n: Int!, $cursor: String) {
repository(owner: $owner, name: $repo) {
issues(first: $issue_n, orderBy: {field: UPDATED_AT, direction: DESC}, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
nodes {
id
bodyUrl
author {
login
avatarUrl
}
title
body
closed
createdAt
comments(last: $comment_n) {
nodes {
id
url
author {
login
avatarUrl
}
body
}
}
}
}
}
}
""",
%{
issue_n: @default_issue_n,
comment_n: @default_comment_n,
repo: repo,
owner: owner,
cursor: cursor
}}
)

case result do
{:ok, %{status: 200, body: %{"data" => data}}} ->
{:ok, data}

# https://docs.github.com/en/graphql/overview/rate-limits-and-node-limits-for-the-graphql-api#exceeding-the-rate-limit
{:ok, %{status: 403, headers: headers}} ->
if headers["x-ratelimit-remaining"] == "0" and length(headers["x-ratelimit-reset"]) == 1 do
{:try_after_s, String.to_integer(Enum.at(headers["x-ratelimit-reset"], 0))}
else
{:try_after_s, 60}
end

{:ok, %{status: 200, body: %{"errors" => errors}}} ->
{:error, errors}
query = """
query ($owner: String!, $repo: String!, $issue_n: Int!, $comment_n: Int!, $cursor: String) {
repository(owner: $owner, name: $repo) {
issues(first: $issue_n, orderBy: {field: UPDATED_AT, direction: DESC}, after: $cursor) {
pageInfo {
endCursor
hasNextPage
}
nodes {
id
bodyUrl
author {
login
avatarUrl
}
title
body
closed
createdAt
comments(last: $comment_n) {
nodes {
id
url
author {
login
avatarUrl
}
body
}
}
}
}
}
}
"""

variables = %{
owner: config.owner,
repo: config.repo,
issue_n: @default_issue_n,
comment_n: @default_comment_n
}

{:error, error} ->
{:error, error}
end
nodes = GithubFetcher.run_all(query, variables)
{:ok, Enum.map(nodes, &transform_issue_node/1)}
end

defp transform_issue_node(issue) do
Expand Down

0 comments on commit c75e70b

Please sign in to comment.