-
Notifications
You must be signed in to change notification settings - Fork 113
/
async.rb
159 lines (144 loc) · 5.27 KB
/
async.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# frozen_string_literal: true
module Telegram
module Bot
# Telegram clients can perform requests in async way with
# any job adapter (ActiveJob by default). Using Rails you don't need any
# additional configuration. However you may want to enable async requests
# by default with `async: true` in `secrets.yml`.
#
# telegram:
# bots:
# chat_async:
# token: secret
# async: true # enable async mode for client
#
# Without Rails To start using async requests
# initialize client with `id` kwarg and make sure the client is
# accessible via `Teletgram.bots[id]` in job worker. Or just use
# `Telegram.bots_config=` for configuration.
#
# Being in async mode `#request` enqueues job to perform
# http request instead of performing it immediately.
# Async behavior is controlled with `#async=` writer
# and can be enabled/disabled for the block with `#async`:
#
# client = Telegram::Bot::Client.new(**config, async: true)
# client.send_message(message)
# client.async(false) { client.send_message(other_one) }
#
# `#async=` sets global value for all threads,
# while `#async(val, &block)` is thread-safe.
#
# It can be set with custom job class or classname. By default it defines
# job classes inherited from ApplicationJob, which
# can be accessed via `.default_async_job`. You can integrate it with any
# other job provider by defining a class with `.perform_later(bot_id, *args)`
# method. See Async::Job for implemetation.
module Async
# Used to track missing key in a hash in local variable.
MISSING_VALUE = Object.new.freeze
module Job
class << self
def included(base)
base.singleton_class.send :attr_accessor, :client_class
end
end
def perform(client_id, *args)
client = self.class.client_class.wrap(client_id.to_sym)
client.async(false) { client.request(*args) }
end
end
module ClassMethods
def default_async_job
@default_async_job ||= begin
begin
ApplicationJob
rescue NameError
raise 'Define ApplicationJob class or setup #async= with custom job class'
end
klass = Class.new(ApplicationJob) { include Job }
klass.client_class = self
const_set(:AsyncJob, klass)
end
end
# This is used in specs.
def default_async_job=(val)
@default_async_job = val
remove_const(:AsyncJob) if const_defined?(:AsyncJob, false)
end
# Prepares argments for async job. ActiveJob doesn't support
# Symbol in argumens. Also we can encode json bodies only once here,
# so it would not be unnecessarily serialized-deserialized.
#
# This is stub method, which returns input. Every client class
# must prepare args itself.
def prepare_async_args(*args)
args
end
# Returns default_async_job if `true` is given,
# treats String as a constant name, or bypasses any other values.
def prepare_async_val(val)
case val
when true then default_async_job
when String then Object.const_get(val)
else val
end
end
end
class << self
def prepended(base)
base.extend(ClassMethods)
end
# Transforms symbols to strings in hash values.
def prepare_hash(hash)
return hash unless hash.is_a?(Hash)
hash = hash.dup
hash.each { |key, val| hash[key] = val.to_s if val.is_a?(Symbol) }
end
# Thread-local hash to store async config for every client.
def thread_store
Thread.current[:telegram_bot_async] ||= {}
end
end
attr_reader :id
def initialize(*, id: nil, async: nil, **options)
@id = id
self.async = async
super
end
# Sets default async value for all threads.
# Uses `self.class.prepare_async_val` to prepare value.
def async=(val)
@async = self.class.prepare_async_val(val)
end
# Sets async value in a thread-safe way for the block.
# Uses `self.class.prepare_async_val` to prepare value.
#
# If no block is given returns previously set value or the global one,
# set by #async=.
def async(val = true) # rubocop:disable Style/OptionalBooleanParameter
thread_key = object_id
thread_store = Async.thread_store
return thread_store.fetch(thread_key) { @async } unless block_given?
begin
old_val = thread_store.fetch(thread_key) { MISSING_VALUE }
thread_store[thread_key] = self.class.prepare_async_val(val)
yield
ensure
if old_val == MISSING_VALUE
thread_store.delete(thread_key)
else
thread_store[thread_key] = old_val
end
end
end
# Uses job if #async is set.
def request(*args)
job_class = async
return super unless job_class
raise 'Can not enqueue job without client id' unless id
job_class.perform_later(id.to_s, *self.class.prepare_async_args(*args))
end
end
end
end