Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support cohort targeting for local evaluation #68

Merged
merged 31 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
24133c7
add topological sort and tests
tyiuhc Jul 18, 2024
c171e07
add translations and tests
tyiuhc Jul 19, 2024
de6fdbc
fix tests and implemetation
tyiuhc Jul 22, 2024
669ab1a
fix lint
tyiuhc Jul 22, 2024
18152de
fix tests
tyiuhc Jul 23, 2024
784e9ac
fix mutex-group constant
tyiuhc Jul 23, 2024
a1ad381
add flag utils and testing
tyiuhc Jul 15, 2024
1b2963f
add cohort loader test
tyiuhc Jul 15, 2024
3496cc4
add deployment_runner tests
tyiuhc Jul 16, 2024
a762a3f
fix cohort download api
tyiuhc Jul 17, 2024
df56478
update implementations and tests for local eval v2
tyiuhc Jul 23, 2024
53c6be6
fix error handling and tests
tyiuhc Jul 24, 2024
877260f
clean up imports, fix lint
tyiuhc Jul 24, 2024
80388e8
add server zone
tyiuhc Jul 24, 2024
aac0fa0
update tests and fix USER_GROUP const undefined
tyiuhc Jul 26, 2024
115dbd9
merge with main
tyiuhc Jul 29, 2024
f85f857
fix factory tests
tyiuhc Jul 29, 2024
8fff5fc
update rubocop
tyiuhc Jul 29, 2024
b8ad1ab
cohort not modfied should not throw error
tyiuhc Jul 31, 2024
7dacc49
do not throw exception upon start() if cohort download fails, log war…
tyiuhc Aug 1, 2024
bad47b4
add live unit tests for evaluate with cohorts
tyiuhc Aug 2, 2024
721113f
fix spec_helper and test yml
tyiuhc Aug 2, 2024
1146fa3
fix flag_config_fetcher
tyiuhc Aug 2, 2024
53eb235
simplify download/update cohorts for deployment_runner, fix desc to c…
tyiuhc Aug 5, 2024
6ea735e
fix rubocop
tyiuhc Aug 5, 2024
5b688bd
cohort update draws from flag configs
tyiuhc Aug 6, 2024
3a3d313
nit: fix log message
tyiuhc Aug 6, 2024
87c0ebc
update cohort_sync_config fields: include polling and remove request …
tyiuhc Aug 6, 2024
36f1318
nit: check cohort_sync_config exists to create cohort poller
tyiuhc Aug 6, 2024
8d77c7c
fix lint
tyiuhc Aug 6, 2024
564bea1
add SDK+version to cohort request headers
tyiuhc Aug 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/pull-request-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on: [pull_request]

jobs:
test:
environment: Unit Test
runs-on: ubuntu-latest
strategy:
matrix:
Expand All @@ -16,4 +17,9 @@ jobs:
ruby-version: ${{ matrix.ruby-version }}
bundler-cache: true # runs 'bundle install' and caches installed gems automatically
- name: Run tests and lint
env:
API_KEY: ${{ secrets.API_KEY }}
SECRET_KEY: ${{ secrets.SECRET_KEY }}
EU_API_KEY: ${{ secrets.EU_API_KEY }}
EU_SECRET_KEY: ${{ secrets.EU_SECRET_KEY }}
run: bundle exec rake
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
.DS_Store

# Used by dotenv library to load environment variables.
# .env
.env

# Ignore Byebug command history file.
.byebug_history
Expand Down
3 changes: 2 additions & 1 deletion amplitude-experiment.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rake', '~> 13.0'
spec.add_development_dependency 'rdoc', '= 6.4'
spec.add_development_dependency 'rspec', '~> 3.6'
spec.add_development_dependency 'rubocop', '= 1.21'
spec.add_development_dependency 'rubocop', '= 1.22.3'
spec.add_development_dependency 'simplecov', '~> 0.21'
spec.add_development_dependency 'webmock', '~> 3.14'
spec.add_development_dependency 'yard', '~> 0.9'
spec.add_development_dependency 'dotenv', '~> 2.8.1'
spec.metadata['rubygems_mfa_required'] = 'false'
spec.add_runtime_dependency 'ffi', '~> 1.15'
end
11 changes: 10 additions & 1 deletion lib/amplitude-experiment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
require 'experiment/remote/client'
require 'experiment/local/client'
require 'experiment/local/config'
require 'experiment/local/fetcher'
require 'experiment/local/assignment/assignment'
require 'experiment/local/assignment/assignment_filter'
require 'experiment/local/assignment/assignment_service'
Expand All @@ -19,6 +18,16 @@
require 'experiment/util/user'
require 'experiment/util/variant'
require 'experiment/error'
require 'experiment/util/flag_config'
require 'experiment/flag/flag_config_fetcher'
require 'experiment/flag/flag_config_storage'
require 'experiment/cohort/cohort_download_api'
require 'experiment/cohort/cohort'
require 'experiment/cohort/cohort_loader'
require 'experiment/cohort/cohort_storage'
require 'experiment/cohort/cohort_sync_config'
require 'experiment/deployment/deployment_runner'
require 'experiment/util/poller'

# Amplitude Experiment Module
module AmplitudeExperiment
Expand Down
25 changes: 25 additions & 0 deletions lib/experiment/cohort/cohort.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module AmplitudeExperiment
USER_GROUP_TYPE = 'User'.freeze
# Cohort
class Cohort
attr_accessor :id, :last_modified, :size, :member_ids, :group_type

def initialize(id, last_modified, size, member_ids, group_type = USER_GROUP_TYPE)
@id = id
@last_modified = last_modified
@size = size
@member_ids = member_ids.to_set
@group_type = group_type
end

def ==(other)
return false unless other.is_a?(Cohort)

@id == other.id &&
@last_modified == other.last_modified &&
@size == other.size &&
@member_ids == other.member_ids &&
@group_type == other.group_type
end
end
end
90 changes: 90 additions & 0 deletions lib/experiment/cohort/cohort_download_api.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
require 'base64'
require 'json'
require 'net/http'
require 'uri'
require 'set'

module AmplitudeExperiment
# CohortDownloadApi
class CohortDownloadApi
COHORT_REQUEST_TIMEOUT_MILLIS = 5000
COHORT_REQUEST_RETRY_DELAY_MILLIS = 100

def get_cohort(cohort_id, cohort = nil)
raise NotImplementedError
end
end

# DirectCohortDownloadApi
class DirectCohortDownloadApi < CohortDownloadApi
def initialize(api_key, secret_key, max_cohort_size, server_url, logger)
super()
@api_key = api_key
@secret_key = secret_key
@max_cohort_size = max_cohort_size
@server_url = server_url
@logger = logger
end

def get_cohort(cohort_id, cohort = nil)
@logger.debug("getCohortMembers(#{cohort_id}): start")
errors = 0

loop do
begin
last_modified = cohort.nil? ? nil : cohort.last_modified
response = get_cohort_members_request(cohort_id, last_modified)
@logger.debug("getCohortMembers(#{cohort_id}): status=#{response.code}")

case response.code.to_i
when 200
cohort_info = JSON.parse(response.body)
@logger.debug("getCohortMembers(#{cohort_id}): end - resultSize=#{cohort_info['size']}")
return Cohort.new(
cohort_info['cohortId'],
cohort_info['lastModified'],
cohort_info['size'],
cohort_info['memberIds'].to_set,
cohort_info['groupType']
)
when 204
@logger.debug("getCohortMembers(#{cohort_id}): Cohort not modified")
return nil
when 413
raise CohortTooLargeError.new(cohort_id, "Cohort exceeds max cohort size: #{response.code}")
else
raise HTTPErrorResponseError.new(response.code, cohort_id, "Unexpected response code: #{response.code}") if response.code.to_i != 202

end
rescue StandardError => e
errors += 1 unless response && e.is_a?(HTTPErrorResponseError) && response.code.to_i == 429
@logger.debug("getCohortMembers(#{cohort_id}): request-status error #{errors} - #{e}")
raise e if errors >= 3 || e.is_a?(CohortTooLargeError)
end

sleep(COHORT_REQUEST_RETRY_DELAY_MILLIS / 1000.0)
end
end

private

def get_cohort_members_request(cohort_id, last_modified)
headers = {
'Authorization' => "Basic #{basic_auth}",
'Content-Type' => 'application/json;charset=utf-8',
'X-Amp-Exp-Library' => "experiment-ruby-server/#{VERSION}"
}
url = "#{@server_url}/sdk/v1/cohort/#{cohort_id}?maxCohortSize=#{@max_cohort_size}"
url += "&lastModified=#{last_modified}" if last_modified

request = Net::HTTP::Get.new(URI(url), headers)
http = PersistentHttpClient.get(@server_url, { read_timeout: COHORT_REQUEST_TIMEOUT_MILLIS }, basic_auth)
http.request(request)
end

def basic_auth
credentials = "#{@api_key}:#{@secret_key}"
Base64.strict_encode64(credentials)
end
end
end
39 changes: 39 additions & 0 deletions lib/experiment/cohort/cohort_loader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module AmplitudeExperiment
# CohortLoader
class CohortLoader
def initialize(cohort_download_api, cohort_storage)
@cohort_download_api = cohort_download_api
@cohort_storage = cohort_storage
@jobs = {}
@lock_jobs = Mutex.new
end

def load_cohort(cohort_id)
@lock_jobs.synchronize do
unless @jobs.key?(cohort_id)
future = Concurrent::Promises.future do
load_cohort_internal(cohort_id)
ensure
remove_job(cohort_id)
end
@jobs[cohort_id] = future
end
@jobs[cohort_id]
end
end

private

def load_cohort_internal(cohort_id)
stored_cohort = @cohort_storage.cohort(cohort_id)
updated_cohort = @cohort_download_api.get_cohort(cohort_id, stored_cohort)
@cohort_storage.put_cohort(updated_cohort) unless updated_cohort.nil?
end

def remove_job(cohort_id)
@lock_jobs.synchronize do
@jobs.delete(cohort_id)
end
end
end
end
91 changes: 91 additions & 0 deletions lib/experiment/cohort/cohort_storage.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
module AmplitudeExperiment
# CohortStorage
class CohortStorage
def cohort(cohort_id)
raise NotImplementedError
end

def cohorts
raise NotImplementedError
end

def get_cohorts_for_user(user_id, cohort_ids)
raise NotImplementedError
end

def get_cohorts_for_group(group_type, group_name, cohort_ids)
raise NotImplementedError
end

def put_cohort(cohort_description)
raise NotImplementedError
end

def delete_cohort(group_type, cohort_id)
raise NotImplementedError
end

def cohort_ids
raise NotImplementedError
end
end

class InMemoryCohortStorage < CohortStorage
def initialize
super
@lock = Mutex.new
@group_to_cohort_store = {}
@cohort_store = {}
end

def cohort(cohort_id)
@lock.synchronize do
@cohort_store[cohort_id]
end
end

def cohorts
@lock.synchronize do
@cohort_store.dup
end
end

def get_cohorts_for_user(user_id, cohort_ids)
get_cohorts_for_group(USER_GROUP_TYPE, user_id, cohort_ids)
end

def get_cohorts_for_group(group_type, group_name, cohort_ids)
result = Set.new
@lock.synchronize do
group_type_cohorts = @group_to_cohort_store[group_type] || Set.new
group_type_cohorts.each do |cohort_id|
members = @cohort_store[cohort_id]&.member_ids || Set.new
result.add(cohort_id) if cohort_ids.include?(cohort_id) && members.include?(group_name)
end
end
result
end

def put_cohort(cohort)
@lock.synchronize do
@group_to_cohort_store[cohort.group_type] ||= Set.new
@group_to_cohort_store[cohort.group_type].add(cohort.id)
@cohort_store[cohort.id] = cohort
end
end

def delete_cohort(group_type, cohort_id)
@lock.synchronize do
group_cohorts = @group_to_cohort_store[group_type] || Set.new
group_cohorts.delete(cohort_id)
@cohort_store.delete(cohort_id)
end
end

def cohort_ids
@lock.synchronize do
@cohort_store.keys.to_set
end
end
end
end
27 changes: 27 additions & 0 deletions lib/experiment/cohort/cohort_sync_config.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module AmplitudeExperiment
DEFAULT_COHORT_SYNC_URL = 'https://cohort-v2.lab.amplitude.com'.freeze
EU_COHORT_SYNC_URL = 'https://cohort-v2.lab.eu.amplitude.com'.freeze

# Experiment Cohort Sync Configuration
class CohortSyncConfig
# This configuration is used to set up the cohort loader. The cohort loader is responsible for
# downloading cohorts from the server and storing them locally.
# Parameters:
# api_key (str): The project API Key
# secret_key (str): The project Secret Key
# max_cohort_size (int): The maximum cohort size that can be downloaded
# cohort_polling_interval_millis (int): The interval in milliseconds to poll for cohorts, the minimum value is 60000
# cohort_server_url (str): The server endpoint from which to request cohorts

attr_accessor :api_key, :secret_key, :max_cohort_size, :cohort_polling_interval_millis, :cohort_server_url

def initialize(api_key, secret_key, max_cohort_size: 2_147_483_647, cohort_polling_interval_millis: 60_000,
cohort_server_url: DEFAULT_COHORT_SYNC_URL)
@api_key = api_key
@secret_key = secret_key
@max_cohort_size = max_cohort_size
@cohort_polling_interval_millis = [cohort_polling_interval_millis, 60_000].max
@cohort_server_url = cohort_server_url
end
end
end
Loading
Loading