diff --git a/.deepsource.toml b/.deepsource.toml index 7421ca40c..0ac63d336 100644 --- a/.deepsource.toml +++ b/.deepsource.toml @@ -2,11 +2,6 @@ version = 1 [[analyzers]] name = "python" -enabled = true [analyzers.meta] - runtime_version = "3.x.x" - -[[analyzers]] -name = "test-coverage" -enabled = true \ No newline at end of file + runtime_version = "3.x.x" \ No newline at end of file diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index f6151ad26..c109fe21c 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -11,7 +11,7 @@ jobs: runs-on: [ubuntu-latest] env: - DEEPSOURCE_DSN: ${{ secrets.DEEPSOURCE_DSN }} + DEEPSOURCE_DSN: ${{ secrets.ENTERPRISE_DSN }} steps: - name: Checkout code diff --git a/assignment.py b/assignment.py deleted file mode 100644 index f401a7daf..000000000 --- a/assignment.py +++ /dev/null @@ -1,3 +0,0 @@ -*FIRST = [1, 2, 3] -(*FIRST,) = [1, 2, 3] -*FIRST, a, b = [1, 2, 3] diff --git a/demo_code.py b/demo_code.py index ad68ff119..9fde50644 100644 --- a/demo_code.py +++ b/demo_code.py @@ -9,7 +9,6 @@ AWS_SECRET_KEY = "d6s$f9g!j8mg7hw?n&2" - class BaseNumberGenerator: """Declare a method -- `get_number`.""" @@ -29,7 +28,6 @@ def cmethod(cls, something): cmethod = classmethod(cmethod) - class RandomNumberGenerator: """Generate random numbers.""" @@ -41,6 +39,14 @@ def get_number(self, min_max=[1, 10]): assert all([isinstance(i, int) for i in min_max]) return random.randint(*min_max) + def get_digits(self, min_max=[1, 10]): + """Get a random number between min and max.""" + assert all([isinstance(i, int) for i in min_max]) + return random.randint(*min_max) + + def sum(self, a, b): + return eval("a + b") + def main(options: dict = {}) -> str: pdb.set_trace() diff --git a/hello.py b/hello.py new file mode 100644 index 000000000..dc8227b07 --- /dev/null +++ b/hello.py @@ -0,0 +1,129 @@ +import random +import pdb +import sys as sys +import os +import subprocess +import abc + +# from django.db.models.expressions import RawSQL + +AWS_SECRET_KEY = "d6s$f9g!j8mg7hw?n&2" + + +class BaseNumberGenerator: + """Declare a method -- `get_number`.""" + + def __init__(self): + self.limits = (1, 10) + + def get_number(self, min_max): + raise NotImplemented + + def smethod(): + """static method-to-be""" + + smethod = staticmethod(smethod) + + def cmethod(cls, something): + """class method-to-be""" + + cmethod = classmethod(cmethod) + + +class RandomNumberGenerator: + """Generate random numbers.""" + + def limits(self): + return self.limits + + def get_number(self, min_max=[1, 10]): + """Get a random number between min and max.""" + assert all([isinstance(i, int) for i in min_max]) + return random.randint(*min_max) + + +def main(options: dict = {}) -> str: + pdb.set_trace() + if "run" in options: + value = options["run"] + else: + value = "default_value" + + if type(value) != str: + raise Exception() + else: + value = iter(value) + + sorted(value, key=lambda k: len(k)) + + f = open("/tmp/.deepsource.toml", "r") + f.write("config file.") + f.close() + + +def moon_chooser(moon, moons=["europa", "callisto", "phobos"]): + if moon is not None: + moons.append(moon) + + return random.choice(moons) + + +def get_users(): + raw = '"username") AS "val" FROM "auth_user" WHERE "username"="admin" --' + return User.objects.annotate(val=RawSQL(raw, [])) + + +def tar_something(): + os.tempnam("dir1") + subprocess.Popen("/bin/chown *", shell=True) + o.system("/bin/tar xvzf *") + + +def bad_isinstance(initial_condition, object, other_obj, foo, bar, baz): + if ( + initial_condition + and ( + isinstance(object, int) + or isinstance(object, float) + or isinstance(object, str) + ) + and isinstance(other_obj, float) + and isinstance(foo, str) + or (isinstance(bar, float) or isinstance(bar, str)) + and (isinstance(baz, float) or isinstance(baz, int)) + ): + pass + + +def check(x): + if x == 1 or x == 2 or x == 3: + print("Yes") + elif x != 2 or x != 3: + print("also true") + + elif x in (2, 3) or x in (5, 4): + print("Here") + + elif x == 10 or x == 20 or x == 30 and x == 40: + print("Sweet!") + + elif x == 10 or x == 20 or x == 30: + print("Why even?") + +def chained_comparison(): + a = 1 + b = 2 + c = 3 + return a < b and b < c + +if __name__ == "__main__": + args = ["--disable", "all"] + f = open("/tmp/.deepsource.toml", "r") + f.write("config file.") + f.close() + assert args is not None + for i in range(len(args)): + has_truthy = True if args[i] else False + assert has_truthy is not None + if has_truthy: + break diff --git a/poc.py b/poc.py index 6ef13f998..04a4b09c8 100644 --- a/poc.py +++ b/poc.py @@ -1,3 +1,3 @@ import os -x = [i for i in range(10)] +x = list(range(10)) diff --git a/security_issues.py b/security_issues.py new file mode 100644 index 000000000..50c8b3a20 --- /dev/null +++ b/security_issues.py @@ -0,0 +1,133 @@ +import sqlite3 +import os +import pickle +import subprocess +import hashlib + + +# SQL Injection vulnerability +def get_user_by_username(username): + """Fetch user from database - VULNERABLE to SQL injection""" + conn = sqlite3.connect('users.db') + cursor = conn.cursor() + + # Vulnerable: direct string concatenation + query = f"SELECT * FROM users WHERE username = '{username}'" + cursor.execute(query) + + result = cursor.fetchone() + conn.close() + return result + + +def authenticate_user(username, password): + """Authenticate user - VULNERABLE to SQL injection""" + conn = sqlite3.connect('users.db') + cursor = conn.cursor() + + # Vulnerable: string formatting + query = "SELECT * FROM users WHERE username = '%s' AND password = '%s'" % (username, password) + cursor.execute(query) + + user = cursor.fetchone() + conn.close() + return user is not None + + +# Command Injection vulnerability +def ping_host(hostname): + """Ping a host - VULNERABLE to command injection""" + # Vulnerable: user input directly in shell command + command = f"ping -c 4 {hostname}" + result = os.system(command) + return result + + +def check_network(ip_address): + """Check network connectivity - VULNERABLE to command injection""" + # Vulnerable: using shell=True with user input + cmd = f"nslookup {ip_address}" + output = subprocess.check_output(cmd, shell=True) + return output.decode() + + +# Path Traversal vulnerability +def read_user_file(filename): + """Read a user file - VULNERABLE to path traversal""" + # Vulnerable: no validation of filename + base_dir = "/var/www/uploads/" + file_path = base_dir + filename + + with open(file_path, 'r') as f: + content = f.read() + return content + + +def get_log_file(log_name): + """Get log file contents - VULNERABLE to path traversal""" + # Vulnerable: user-controlled path + log_dir = "/var/logs/" + full_path = os.path.join(log_dir, log_name) + + if os.path.exists(full_path): + with open(full_path, 'r') as f: + return f.read() + return None + + +# Insecure Deserialization +def load_user_session(session_data): + """Load user session - VULNERABLE to insecure deserialization""" + # Vulnerable: pickle can execute arbitrary code + user_session = pickle.loads(session_data) + return user_session + + +# Weak Cryptography +def hash_password(password): + """Hash password - VULNERABLE uses weak hashing""" + # Vulnerable: MD5 is cryptographically broken + return hashlib.md5(password.encode()).hexdigest() + + +def generate_token(user_id): + """Generate auth token - VULNERABLE uses weak hashing""" + # Vulnerable: SHA1 is considered weak + return hashlib.sha1(str(user_id).encode()).hexdigest() + + +# Hardcoded Credentials +DATABASE_PASSWORD = "admin123" +API_KEY = "sk-1234567890abcdef" +SECRET_KEY = "my-super-secret-key-do-not-share" + + +def connect_to_database(): + """Connect to database with hardcoded credentials""" + username = "admin" + password = "password123" # Hardcoded password + host = "localhost" + + connection_string = f"postgresql://{username}:{password}@{host}/mydb" + return connection_string + + +# Unsafe YAML loading +def load_config(yaml_content): + """Load YAML config - VULNERABLE to code execution""" + import yaml + # Vulnerable: yaml.load() can execute arbitrary Python code + config = yaml.load(yaml_content) + return config + + +# Main function to demonstrate the vulnerabilities +if __name__ == "__main__": + # These functions are reachable and can be called + print("Security Issues Demo") + + # Example calls (commented out to avoid actual execution) + # user = get_user_by_username("admin") + # result = ping_host("localhost") + # content = read_user_file("data.txt") + # password_hash = hash_password("mypassword") diff --git a/vulnerable_api.py b/vulnerable_api.py new file mode 100644 index 000000000..95f659479 --- /dev/null +++ b/vulnerable_api.py @@ -0,0 +1,192 @@ +""" +Vulnerable Flask API demonstrating common security issues +""" +from flask import Flask, request, jsonify, send_file +import sqlite3 +import os +import subprocess +import hashlib +import pickle +import base64 + +app = Flask(__name__) + +# Hardcoded secret key +app.secret_key = "hardcoded-secret-key-12345" + +# Database setup +def init_db(): + conn = sqlite3.connect('users.db') + cursor = conn.cursor() + cursor.execute('''CREATE TABLE IF NOT EXISTS users + (id INTEGER PRIMARY KEY, username TEXT, password TEXT, email TEXT)''') + cursor.execute("INSERT OR IGNORE INTO users VALUES (1, 'admin', 'admin123', 'admin@example.com')") + conn.commit() + conn.close() + + +@app.route('/api/user/') +def get_user(username): + """SQL Injection vulnerability - username from URL""" + conn = sqlite3.connect('users.db') + cursor = conn.cursor() + + # VULNERABLE: SQL injection via string formatting + query = f"SELECT username, email FROM users WHERE username = '{username}'" + cursor.execute(query) + + user = cursor.fetchone() + conn.close() + + if user: + return jsonify({'username': user[0], 'email': user[1]}) + return jsonify({'error': 'User not found'}), 404 + + +@app.route('/api/login', methods=['POST']) +def login(): + """SQL Injection vulnerability in login""" + data = request.get_json() + username = data.get('username', '') + password = data.get('password', '') + + conn = sqlite3.connect('users.db') + cursor = conn.cursor() + + # VULNERABLE: SQL injection + query = "SELECT * FROM users WHERE username = '%s' AND password = '%s'" % (username, password) + cursor.execute(query) + + user = cursor.fetchone() + conn.close() + + if user: + return jsonify({'success': True, 'message': 'Login successful'}) + return jsonify({'success': False, 'message': 'Invalid credentials'}), 401 + + +@app.route('/api/ping') +def ping(): + """Command Injection vulnerability""" + host = request.args.get('host', 'localhost') + + # VULNERABLE: Command injection via os.system + result = os.system(f"ping -c 2 {host}") + + return jsonify({'host': host, 'result': result}) + + +@app.route('/api/dns-lookup') +def dns_lookup(): + """Command Injection via subprocess""" + domain = request.args.get('domain', '') + + # VULNERABLE: Command injection with shell=True + try: + output = subprocess.check_output(f"nslookup {domain}", shell=True, stderr=subprocess.STDOUT) + return jsonify({'domain': domain, 'output': output.decode()}) + except subprocess.CalledProcessError as e: + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/file/') +def read_file(filename): + """Path Traversal vulnerability""" + # VULNERABLE: No path validation, allows directory traversal + base_path = '/var/www/uploads/' + file_path = base_path + filename + + try: + with open(file_path, 'r') as f: + content = f.read() + return jsonify({'filename': filename, 'content': content}) + except FileNotFoundError: + return jsonify({'error': 'File not found'}), 404 + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/download') +def download_file(): + """Path Traversal via send_file""" + filename = request.args.get('file', '') + + # VULNERABLE: User-controlled file path + file_path = f"/var/www/files/{filename}" + + try: + return send_file(file_path, as_attachment=True) + except FileNotFoundError: + return jsonify({'error': 'File not found'}), 404 + + +@app.route('/api/session/load', methods=['POST']) +def load_session(): + """Insecure Deserialization vulnerability""" + data = request.get_json() + session_data = data.get('session', '') + + # VULNERABLE: pickle deserialization of user data + try: + decoded = base64.b64decode(session_data) + session = pickle.loads(decoded) + return jsonify({'session': session}) + except Exception as e: + return jsonify({'error': 'Invalid session data'}), 400 + + +@app.route('/api/hash-password') +def hash_password(): + """Weak Cryptography - MD5 for password hashing""" + password = request.args.get('password', '') + + # VULNERABLE: MD5 is cryptographically broken + hashed = hashlib.md5(password.encode()).hexdigest() + + return jsonify({'password': password, 'hash': hashed}) + + +@app.route('/api/eval', methods=['POST']) +def eval_expression(): + """Code Injection via eval()""" + data = request.get_json() + expression = data.get('expression', '') + + # VULNERABLE: eval allows arbitrary code execution + try: + result = eval(expression) + return jsonify({'expression': expression, 'result': result}) + except Exception as e: + return jsonify({'error': str(e)}), 400 + + +@app.route('/api/exec', methods=['POST']) +def exec_code(): + """Code Injection via exec()""" + data = request.get_json() + code = data.get('code', '') + + # VULNERABLE: exec allows arbitrary code execution + try: + exec(code) + return jsonify({'message': 'Code executed successfully'}) + except Exception as e: + return jsonify({'error': str(e)}), 400 + + +@app.route('/api/debug-info') +def debug_info(): + """Information Disclosure - exposes sensitive debug info""" + # VULNERABLE: Exposes sensitive system information + return jsonify({ + 'environment': dict(os.environ), + 'secret_key': app.secret_key, + 'debug': app.debug, + 'database_password': 'db_password_12345' + }) + + +if __name__ == '__main__': + init_db() + # VULNERABLE: Debug mode enabled and listening on all interfaces + app.run(host='0.0.0.0', port=5000, debug=True)