Source code for sblegos.TransactionAnalyser

import os
import sys
from pathlib import Path

import numpy as np
import pandas as pd
import pylcs

absolute_path = os.fspath(Path.cwd().parent.parent.parent)
if absolute_path not in sys.path:
    sys.path.append(absolute_path)


[docs]class TransactionAnalyser(object): """ This class is used to analyse transactions of an address. It has methods that allows to perform on chain analysis of an address. """ def __init__(self, df_transactions, array_address): """ This class is used to analyse transactions of an address. It has methods that allows to perform on chain analysis of an address. It is initialized with a df_transactions containing all the transactions made by a list of addresses that should match the df_transactions Parameters ---------- df_transactions : pd.DataFrame The dataframe containing all the transactions of the addresses array_address : np.ndarray The ndarray containing a list of addresses """ assert isinstance(df_transactions, pd.DataFrame), "The df_transactions should be a pd.DataFrame" assert isinstance(array_address, np.ndarray), "The df_address should be a numpy array" self.gb_EOA_sorted = None self.df_seed_wallet_naive = None self.df_seed_wallet = None self.details_first_incoming_transaction = None self.details_first_outgoing_transaction = None self.array_address = np.intersect1d(array_address, df_transactions['EOA'].unique()) self.df_transactions = df_transactions[df_transactions['EOA'].isin(array_address)] # store the array of string transactions self.dict_add_interacted = None self.dict_add_string_tx = None self.dict_add_value_string_tx = None # set objects self.set_group_by_sorted_EOA() self.set_seed_wallet_naive() self.set_seed_wallet() self.set_details_first_incoming_transaction() self.set_details_first_outgoing_transaction()
[docs] def has_same_seed_naive(self, address): """ Return if the address has the same seed wallet as one of the seed wallet of the df_transactions Note 1. You should consider using count_same_seed_naive and applying a vectorized operation. 2. df_transaction could contain transactions from multiple network but the seed wallet of the address is filtered which prevent unexpected raise of the boolean. Parameters ---------- address : str The address to check Returns ------- has_same_seed : bool True if the address has the same seed wallet as one of the seed wallet of the df_transactions """ return self.count_same_seed_naive(address) > 0
[docs] def count_same_seed_naive(self, address): """ Return if the address has the same seed wallet as one of the seed wallet of the df_transactions If the df_seed_wallet is not set, it will set it Note df_transaction could contain transactions from multiple network but the seed wallet of the address is filtered which prevent unexpected raise of the boolean. Parameters ---------- address : str The address to check Returns ------- count_same_seed : int The number of addresses having the same seed wallet """ df_same_seed = self.get_address_same_seed(self.df_seed_wallet_naive, address) return df_same_seed.shape[0]
[docs] def has_same_seed(self, address): """ Return if the address has the same seed wallet as one of the seed wallet of the df_transactions using a non-naive algorithm. For some address the first transaction is not the incoming funding transaction. It is possible to interact with a smart contract even before receiving any fund. This algorithm takes that into account. 1. You should consider using count_same_seed_naive and applying a vectorized operation. 2. df_transaction could contain transactions from multiple network but the seed wallet of the address is filtered which prevent unexpected raise of the boolean. Parameters ---------- address : str The address to check Returns ------- has_same_seed : bool True if the address has the same seed wallet as one of the seed wallet of the df_transactions """ return self.count_same_seed(address)
[docs] def count_same_seed(self, address): """ Return the number of address having the same seed wallet as one of the seed wallet of the df_transactions using a non-naive algorithm. For some address the first transaction is not the incoming funding transaction. It is possible to interact with a smart contract even before receiving any fund. This algorithm takes that into account. Note that it does not retrieve the true funder through the internal transaction but the first incoming transaction. Parameters ---------- address : str The address to check Returns ------- count_same_seed : int The number of addresses having the same seed wallet as one of the seed wallet of the df_transactions """ if address in self.df_seed_wallet.to_address.values: # check that there normal incoming transactions df_same_seed = self.get_address_same_seed(self.df_seed_wallet, address) return df_same_seed.shape[0] else: return 0
[docs] @staticmethod def get_address_same_seed(df, address): """ Return a df of address that have the same seed wallet as the address given in parameter. Parameters ---------- df : pd.DataFrame The df to filter address : str The address to check Returns ------- df_same_seed : pd.DataFrame The df of address that have the same seed wallet as the address given in parameter. """ seed_add = df.loc[address, 'from_address'] df_same_seed = df.drop(address, axis=0).loc[ df.drop(address, axis=0)['from_address'] == seed_add] return df_same_seed
[docs] def has_suspicious_seed_behavior(self, address): """ Return a boolean whether the address has suspicious seed behavior. Most addresses have a seed wallet that is given by first transaction given by the naive algorithm. However, some addresses first transaction is no the first incoming transaction because they first interacted with a smart contract. This is a suspicious behavior. Parameters ---------- address : str The address to check Returns ------- has_suspicious_seed_behavior : bool True if the address has suspicious seed behavior """ return self.has_same_seed(address) != self.has_same_seed_naive(address)
[docs] def set_seed_wallet_naive(self): """ Set the df_seed_wallet_naive attribute of the class. It holds the seed wallet of the addresses in 'EOA' using a naive method that takes the from_address from the transaction of the address Returns ------- None Set the df_seed_wallet_naive attribute of the class """ self.df_seed_wallet_naive = self.gb_EOA_sorted.first().loc[:, ['from_address', 'to_address']]
[docs] def set_seed_wallet(self): """ Set the df_seed_wallet attribute of the class. It holds the seed wallet of the addresses in 'EOA' of df_transactions. It is a non-naive method that look for the first incoming transaction of the address to get the seed wallet. Returns ------- None Set the df_seed_wallet attribute of the class """ df_filtered = self.df_transactions[self.df_transactions['EOA'] == self.df_transactions['to_address']] df_gb = df_filtered.sort_values('block_timestamp', ascending=True).groupby('EOA') self.df_seed_wallet = df_gb.first().loc[:, ['from_address', 'to_address']]
[docs] def set_group_by_sorted_EOA(self): """ Set the gb_EOA_sorted attribute of the class it holds the df_transactions sorted by block_timestamp and grouped by EOA Returns ------- None Set the gb_EOA_sorted attribute of the class """ self.gb_EOA_sorted = self.df_transactions.sort_values('block_timestamp', ascending=True).groupby('EOA')
[docs] def set_details_first_incoming_transaction(self): """ Set the details_first_incoming_transaction attribute of the class. It holds the details of the first incoming transaction of the address given in parameter Parameters ---------- Returns ------- None Set the details_first_incoming_transaction attribute of the class """ df_filtered = self.df_transactions[self.df_transactions['EOA'] == self.df_transactions['to_address']] df_gb = df_filtered.sort_values('block_timestamp', ascending=True).groupby('EOA').first() cols = ['from_address', 'gas_limit', 'gas_used', 'eth_value', 'block_timestamp'] df_gb_first = df_gb.loc[:, cols].reset_index() self.details_first_incoming_transaction = df_gb_first.rename( columns=dict(from_address='first_in_tx_from', gas_limit='first_in_tx_gas_limit', gas_used='first_in_tx_gas_used', eth_value='first_in_tx_eth_value', block_timestamp='first_in_tx_timestamp'))
[docs] def set_details_first_outgoing_transaction(self): df_filtered = self.df_transactions[self.df_transactions['EOA'] == self.df_transactions['from_address']] df_gb = df_filtered.sort_values('block_timestamp', ascending=True).groupby('EOA').first() cols = ['to_address', 'gas_limit', 'gas_used', 'eth_value', 'block_timestamp'] df_gb_first = df_gb.loc[:, cols].reset_index() self.details_first_outgoing_transaction = df_gb_first.rename( columns=dict(from_address='first_out_tx_from', gas_limit='first_out_tx_gas_limit', gas_used='first_out_tx_gas_used', eth_value='first_out_tx_eth_value', block_timestamp='first_out_tx_timestamp'))
[docs] def has_less_than_n_transactions(self, address, n=5): """ Return a boolean whether the address has less than n transactions Parameters ---------- address : str The address to check n : int The number of transactions Returns ------- has_less_than_n_transactions : bool True if the address has less than n transactions """ return self.count_transactions(address) < n
[docs] def count_transactions(self, address): """ Return the number of transactions of the address Parameters ---------- address : str The address to check Returns ------- count_transactions : int The number of transactions of the address """ return self.gb_EOA_sorted.get_group(address).shape[0]
[docs] def set_dict_add_interacted(self): """ Set the dict_add_interacted attribute of the class. It holds a dictionary of address as key and the list of address interacted with as value. Returns None ------- """ if self.dict_add_interacted is None: dict_add_interacted = {} contributors = self.get_contributors() for address in contributors: df = self.gb_EOA_sorted.get_group(address) add_interacted = np.append(df['to_address'].to_numpy(), df['from_address'].to_numpy()) add_interacted = add_interacted.astype('str') unique_add_interacted = np.unique(add_interacted) unique_add_interacted = unique_add_interacted[unique_add_interacted != address] dict_add_interacted[address] = unique_add_interacted self.dict_add_interacted = dict_add_interacted
[docs] def count_interaction_with_other_contributor(self, address): """ Return the number of interactions of the address with other contributor (not itself) Parameters ---------- address : str The address to check Returns ------- count_interaction_with_other_contributor : int The number of interactions of the address with other contributor (not itself) """ self.set_dict_add_interacted() contributors = self.get_contributors() other_contributors = contributors[contributors != address] return self.count_interaction_any(address, other_contributors)
[docs] def has_interacted_with_other_contributor(self, address): """ Return a boolean whether the address has interacted with other contributor (not itself) Parameters ---------- address : str The address to check Returns ------- has_interacted_with_other_contributor : bool True if the address has interacted with one or more contributor of the grant """ return self.count_interaction_with_other_contributor(address) > 0
[docs] def count_interaction_any(self, address, array_address): """ Return an integer of the number of interactions with the addresses in the array_address Parameters ---------- address : str The address to check array_address : narray The array of addresses to check Returns ------- count_interaction_with_any : int The number of interactions with the addresses in the array_address """ unique_add_interacted = self.dict_add_interacted[address] return np.isin(unique_add_interacted, array_address).sum()
[docs] def has_interacted_with_any(self, address, array_address): """ Return a boolean whether the address has interacted with any address in the array_address Parameters ---------- address : str The address to check array_address : narray The array of addresses to check Returns ------- has_interacted_with_any : bool True if the address has interacted with one or more of the addresses in the array_address """ count_interaction_with_any = self.count_interaction_any(address, array_address) return count_interaction_with_any > 0
[docs] def get_contributors(self): """ Return a list of contributors of the grant Returns ------- contributors : narray The array of contributors of the grant """ return self.df_transactions['EOA'].unique()
[docs] def transaction_similitude_pylcs(self, address, algo_type="address_only", minimum_sim_tx=5): """ Return a boolean and the list of addresses if it finds other addresses with similar actions. it first stores some repetitive tasks into a class attribute and then use it to speed up the process. The algorithm is the following: 1. Transform all transactions in to a String of the form: "from_address, to_address, from_address, to_address, ..." 2. Replace the address of the wallet by "x" to ba able to compare the behavior of two addresses. 3. Run the algorithm common longest substring on all the transactions 4. If the longest common substring is longer than 5, return true for the current address. 5. Keep iterating to find the longest common substring and then the score is the length of the longest common substring divided by half the length of the target address string. The score is the min(score, 1) to avoid having a score > 1. Parameters ---------- address : str The address to check algo_type : str The type of algorithm to use. Default is "address_only" which only use the address to compare. options are: address_only, address_and_value minimum_sim_tx : int The number of transactions to use to compare. Default is 5. # char_tolerance : int # The number of character to skip when using the longest common substring algorithm. Default is 0. # 1 may be a good choice when algo_type is "address_and_value". Returns ------- has_similar_behavior : bool True if the address has similar behavior as another address score_similar_behavior : float The similarity score of the address list_similar_address : map The map of address and their similarity score """ str_transactions_target = None # Transform all transactions into a 1D string if algo_type == "address_only": if self.dict_add_string_tx is None: self.set_dict_add_string_transactions(algo_type) str_transactions_target = self.dict_add_string_tx.get(address) elif algo_type == "address_and_value": if self.dict_add_value_string_tx is None: self.set_dict_add_string_transactions(algo_type) # Get all the transactions of the address in a 1D array str_transactions_target = self.dict_add_value_string_tx.get(address) else: Exception("algo_type not supported") shape_target = self.get_address_transactions(address).shape[0] min_shape = max(1, int(shape_target / 4)) max_shape = max(shape_target, shape_target * 3) list_lcs = [] for add in self.array_address: if add != address: shape_other = self.get_address_transactions(add).shape[0] if min_shape < shape_other < max_shape: # Heuristic prevent comparing addresses with different shapes if algo_type == "address_only": str_transactions_other = self.dict_add_string_tx.get(add) else: str_transactions_other = self.dict_add_value_string_tx.get(add) lcs = self.longest_common_sub_string_pylcs(str_transactions_target, str_transactions_other) list_lcs.append(lcs) else: list_lcs.append(0) else: list_lcs.append(0) if minimum_sim_tx == -1: mask = np.array(list_lcs) > max(3, min(10, int(shape_target / 4))) else: mask = np.array(list_lcs) > minimum_sim_tx df_similar_address = pd.DataFrame(self.array_address[mask], columns=['address']) df_similar_address['lcs'] = np.array(list_lcs)[mask] len_tx = len(str_transactions_target) / 2 # Divide by 2 because we have from_address and to_address df_similar_address['score'] = df_similar_address.loc[:, 'lcs'].apply( lambda x: min(x / len_tx, 1)) return df_similar_address.set_index('address')
[docs] @staticmethod def get_array_transactions(df_address_transactions, address, algo_type="address_only"): """ This method replace the target address by an arbitrary "x" to be able to compare the similitude of two wallet. Parameters ---------- df_address_transactions : pd.DataFrame The data frame of transactions address : str The address to replace by x algo_type : str The type of algorithm to use, "address_only" only return from_address and to_address with the address replaced by x "address_and_value" return from_address, value, to_address with the address replaced by x Returns ------- array_transactions : narray An array of strings """ df_address_transactions.sort_values('block_timestamp', ascending=True, inplace=True) if algo_type == "address_only": try: array_transactions = df_address_transactions.loc[:, ['from_address', 'to_address']].dropna() \ .apply(lambda x: x.str[:8]) \ .replace(address[:8], 'x') \ .agg('-'.join, axis=1) \ .values except Exception as e: array_transactions = [] print(e) elif algo_type == "address_and_value": try: array_transactions = df_address_transactions.loc[:, ['from_address', 'value', 'to_address']].dropna() \ .apply(lambda x: x.str[:8]) \ .replace(address, 'x') \ .agg('-'.join, axis=1) \ .values except Exception as e: array_transactions = [] print(e) else: raise ValueError("algo_type must be either address_only or address_and_value") return array_transactions
[docs] def get_address_transactions(self, address): """ Get transactions of an address from the self.df_transaction df using the group by Parameters ---------- address : str The address to retrieve transactions Returns ------- df : pd.DataFrame The data frame with the transactions of the address """ try: df = self.gb_EOA_sorted.get_group(address) except Exception as e: df = pd.DataFrame() print(e) return df
[docs] def get_address_transactions_add(self, df, address): """ Get transactions of an address from a dataframe df Parameters ---------- df : pd.dataFrame Data frame of transactions with the 'EOA' column address : str The address to retrieve transactions Returns ------- df : pd.DataFrame The data frame with the transactions of the address """ return df[self.df_transactions['EOA'] == address]
[docs] def set_dict_add_string_transactions(self, algo_type="address_only"): """ This method create a dictionary with the address as key and the array of transactions as value. The array of transactions is the array crated with the get_array_transactions method. Parameters ---------- algo_type : str The type of algorithm to use Returns ------- None it sets the self.dict_add_string_tx or self.dict_add_value_string_tx attribute """ if self.gb_EOA_sorted is None: gb_address = self.df_transactions.groupby('EOA') else: gb_address = self.gb_EOA_sorted if algo_type == "address_only": if self.dict_add_string_tx is None: self.dict_add_string_tx = self.get_dict_string_tx(gb_address, algo_type=algo_type) elif algo_type == "address_and_value": if self.dict_add_value_string_tx is None: self.dict_add_value_string_tx = self.get_dict_string_tx(gb_address, algo_type=algo_type) else: raise ValueError("algo_type must be either address_only or address_and_value")
[docs] def get_dict_string_tx(self, gb_address, algo_type="address_only"): dict_string_tx = {} for address, df_address in gb_address: array_transactions = self.get_array_transactions(df_address, address, algo_type) dict_string_tx[address] = "".join(array_transactions) return dict_string_tx
[docs] @staticmethod def longest_common_sub_string_pylcs(string_target, string_other): # 1 similar transaction equals to 8 first char of the address + "-" + "x" = 10 char lcs = pylcs.lcs_string_length(string_target, string_other) return lcs // 10 # quotient of the division
[docs] @staticmethod def get_mean_score_lcs(lcs): if lcs.shape[0] == 0: return 0 else: return lcs.reset_index()['score'].mean()
[docs] @staticmethod def get_max_score_lcs(lcs): if lcs.shape[0] == 0: return 0 else: return lcs.reset_index()['score'].max()
[docs] def get_df_seeder_count(self): return self.df_seed_wallet.groupby('from_address').count().sort_values(by='to_address', ascending=False).reset_index().drop( columns=['to_address']).rename(columns={'from_address': 'seeder', 'EOA': 'count_seed'})
[docs] def get_df_features(self, list_features=None): """ Get the features of the transaction dataset Parameters ---------- list_features : list The list of features to retrieve, if None, the default features are retrieved : ['count_tx', 'less_10_tx', 'count_same_seed', 'count_same_seed_naive','same_seed', 'same_seed_naive', 'seed_suspicious', 'count_interact_other_ctbt','details_first_incoming_transaction', 'details_first_outgoing_transaction'] if 'all' is passed, the lcs feature is added Returns ------- df_features : pd.DataFrame The data frame with the features index : EOA all unique addresses in the df_transactions """ default_features = ['count_tx', 'less_10_tx', 'count_same_seed', 'count_same_seed_naive', 'same_seed', 'same_seed_naive', 'seed_suspicious', 'count_interact_other_ctbt', 'details_first_incoming_transaction', 'details_first_outgoing_transaction'] if list_features is None: list_features = default_features elif list_features == 'all': list_features = default_features + ['lcs'] if 'count_tx' in list_features: df_features = self.gb_EOA_sorted['tx_hash'].count().reset_index().rename(columns={'tx_hash': 'count_tx'}) else: df_features = pd.DataFrame(self.df_transactions['EOA'].unique(), columns=['EOA']) if 'less_10_tx' in list_features: df_features['less_10_tx'] = df_features['count_tx'] <= 10 if 'count_same_seed' in list_features: df_features['count_same_seed'] = df_features['EOA'].apply(lambda x: self.count_same_seed(x)) if 'count_same_seed_naive' in list_features: df_features['count_same_seed_naive'] = df_features['EOA'].apply(lambda x: self.count_same_seed_naive(x)) if 'same_seed' in list_features: df_features['same_seed'] = df_features['count_same_seed'] > 0 if 'same_seed_naive' in list_features: df_features['same_seed_naive'] = df_features['count_same_seed_naive'] > 0 if 'seed_suspicious' in list_features: df_features['seed_suspicious'] = df_features.loc[:, 'same_seed'].ne(df_features.loc[:, 'same_seed_naive']) if 'count_interact_other_ctbt' in list_features: df_features['count_interact_other_ctbt'] = df_features['EOA'].apply( lambda x: self.count_interaction_with_other_contributor(x)) if 'lcs' in list_features: df_features['lcs'] = 0 df_features['cluster_size_lcs'] = 0 df_features['mean_score_lcs'] = 0 df_features['max_score_lcs'] = 0 df_bool_less_10_tx = df_features['less_10_tx'] if df_bool_less_10_tx.sum() > 0: r = df_features.loc[df_bool_less_10_tx, 'EOA'].apply( lambda x: self.transaction_similitude_pylcs(x, minimum_sim_tx=3)) df_features.loc[df_bool_less_10_tx, 'cluster_size_lcs'] = r.apply(lambda x: len(x)) df_features.loc[df_bool_less_10_tx, 'mean_score_lcs'] = r.apply(lambda x: self.get_mean_score_lcs(x)) df_features.loc[df_bool_less_10_tx, 'max_score_lcs'] = r.apply(lambda x: self.get_max_score_lcs(x)) df_features['has_lcs'] = df_features['cluster_size_lcs'] > 0 if 'details_first_incoming_transaction' in list_features: details_first_incoming_transaction = self.details_first_incoming_transaction merge = df_features.merge(details_first_incoming_transaction, on='EOA', how='left') else: merge = df_features if 'details_first_outgoing_transaction' in list_features: details_first_outgoing_transaction = self.details_first_outgoing_transaction merge = merge.merge(details_first_outgoing_transaction, on='EOA', how='left') return merge