Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

Revision a7332d65405be83d1eee4a0f1b0baaf64f102cb5 authored by Richard Petri on 07 November 2023, 09:38:29 UTC, committed by Richard Petri on 27 February 2024, 19:35:24 UTC
Add option to connect to running OpenOCD Server
1 parent 546f637
  • Files
  • Changes
  • 217bed2
  • /
  • platforms.py
Raw File Download

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • revision
  • directory
  • content
revision badge
swh:1:rev:a7332d65405be83d1eee4a0f1b0baaf64f102cb5
directory badge Iframe embedding
swh:1:dir:217bed207a8bd39e391861508ee134e612e97964
content badge Iframe embedding
swh:1:cnt:419b326705c8dc7d9c369e3845d25ce592545854

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • revision
  • directory
  • content
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
platforms.py
from mupq import mupq

import abc
import re
import serial
import socket
import subprocess
import time
import os
import os.path
import tqdm

try:
    import chipwhisperer as cw
except ImportError:
    pass


class Qemu(mupq.Platform):

    start_pat = re.compile('.*={4,}\n', re.DOTALL)
    end_pat = re.compile('#\n', re.DOTALL)

    def __init__(self, qemu, machine):
        super().__init__()
        self.qemu = qemu
        self.machine = machine
        self.platformname = "qemu"

    def __enter__(self):
        return super().__enter__()

    def __exit__(self, *args, **kwargs):
        return super().__exit__(*args, **kwargs)

    def run(self, binary_path, expiterations=1):
        if expiterations > 1:
            pb = tqdm.tqdm(total=expiterations, leave=False, desc="Running...")
        args = [
            self.qemu,
            "-M",
            self.machine,
            "-nographic",
            "-semihosting",
            "-kernel",
            binary_path,
        ]
        self.log.info(f'Running QEMU: {" ".join(args)}')
        try:
            proc = subprocess.Popen(args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, encoding="ascii")
            output = ""
            while "#" not in output:
                buf = proc.stdout.readline()
                # print(buf)
                output += buf
                if expiterations > 1:
                    if "+" in buf:
                        pb.update(buf.count("+"))
                    else:
                        pb.refresh()
            proc.wait()
        except Exception:
            try:
                if proc and proc.poll is not None:
                    proc.kill()
            except Exception:
                pass
            raise
        start = self.start_pat.search(output)
        end = self.end_pat.search(output, start.end())
        if end is None:
            return 'ERROR'
        proc.wait()
        if expiterations > 1:
            pb.close()
        return output[start.end():end.start()]


class SerialCommsPlatform(mupq.Platform):

    # Start pattern is at least five equal signs
    start_pat = re.compile(b'.*={4,}\n', re.DOTALL)

    def __init__(self, tty="/dev/ttyACM0", baud=38400, timeout=1):
        super().__init__()
        self._dev = serial.Serial(tty, baud, timeout=timeout)

    def __enter__(self):
        return super().__enter__()

    def __exit__(self, *args, **kwargs):
        self._dev.close()
        return super().__exit__(*args, **kwargs)

    def run(self, binary_path, expiterations=1):
        if expiterations > 1:
            pb = tqdm.tqdm(total=expiterations, leave=False, desc="Running...")
        self._dev.reset_input_buffer()
        self.flash(binary_path)
        # Wait for the first equal sign
        if self._dev.read_until(b'=')[-1] != b'='[0]:
            raise RuntimeError('Timout waiting for start')
        # Wait for the end of the equal delimiter
        start = self._dev.read_until(b'\n')
        self.log.debug(f'Found start pattern: {start}')
        if self.start_pat.fullmatch(start) is None:
            raise RuntimeError('Start does not match')
        # Wait for the end
        output = bytearray()
        while len(output) == 0 or output[-1] != b'#'[0]:
            data = self._dev.read_until(b'#', 128)
            if expiterations > 1:
                if b"+" in data:
                    pb.update(data.count(b"+"))
                else:
                    pb.refresh()
            output.extend(data)
        if expiterations > 1:
            pb.close()
        return output[:-1].decode('utf-8', 'ignore')

    @abc.abstractmethod
    def flash(self, binary_path):
        pass


class OpenOCD(SerialCommsPlatform):
    def __init__(self, script, tty="/dev/ttyACM0", baud=38400, timeout=60):
        super().__init__(tty, baud, timeout)
        self.script = script

    def flash(self, binary_path):
        subprocess.check_call(
            ["openocd", "-f", self.script, "-c", f"program {binary_path} verify reset exit"],
            # stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )


class OpenOCDTcp(SerialCommsPlatform):
    def __init__(self, port=6666, tty="/dev/ttyACM0", baud=38400, timeout=60):
        super().__init__(tty, baud, timeout)
        self.socket = socket.create_connection(("localhost", port))

    def _cmd(self, *commands):
        n = len(commands)
        commands = b"\x1a".join(command.encode("ascii")
                                for command in commands) + b"\x1a"
        self.socket.sendall(commands)
        buf = bytearray()
        while buf.count(b"\x1a") != n:
            buf.extend(self.socket.recv(64))
        return [b.decode("utf-8") for b in buf.split(b"\x1a")[:-1]]

    def flash(self, binary_path):
        binary_path = os.path.abspath(binary_path)
        self._cmd("reset halt",
                  f"program {binary_path} verify reset",
                  "resume")


class StLink(SerialCommsPlatform):
    def flash(self, binary_path):
        extraargs = []
        if os.getenv("MUPQ_ST_FLASH_ARGS") is not None:
            extraargs = os.getenv("MUPQ_ST_FLASH_ARGS").split()
        subprocess.check_call(
            ["st-flash"] + extraargs + ["--reset", "write", binary_path, "0x8000000"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )


class ChipWhisperer(mupq.Platform):

    # Start pattern is at least five equal signs
    start_pat = re.compile('.*={4,}\n', re.DOTALL)
    # End pattern is a hash with a newline
    end_pat = re.compile('.*#\n', re.DOTALL)

    def __init__(self):
        super().__init__()
        self.platformname = "cw"
        self.scope = cw.scope()
        self.target = cw.target(self.scope)
        self.scope.default_setup()

    def __enter__(self):
        return super().__enter__()

    def __exit__(self, *args, **kwargs):
        self.target.close()
        return super().__exit__(*args, **kwargs)

    def device(self):
        return self.wrapper

    def reset_target(self):
        self.scope.io.nrst = 'low'
        time.sleep(0.05)
        self.scope.io.nrst = 'high'
        time.sleep(0.05)

    def flash(self, binary_path):
        prog = cw.programmers.STM32FProgrammer()
        prog.scope = self.scope
        prog.open()
        prog.find()
        prog.erase()
        prog.program(binary_path, memtype="flash", verify=False)
        prog.close()

    def run(self, binary_path, expiterations=1):
        if expiterations > 1:
            pb = tqdm.tqdm(total=expiterations, leave=False, desc="Running...")
        self.flash(binary_path)
        self.target.flush()
        self.reset_target()
        data = ''
        # Wait for the first equal sign
        while '=' not in data:
            data += self.target.read()
        # Wait for the end of the equal delimiter
        match = None
        while match is None:
            buf = self.target.read()
            data += buf
            match = self.start_pat.match(data)
        # Remove the start pattern
        data = data[match.end():]
        # Wait for the end
        match = None
        while match is None:
            buf = self.target.read()
            data += buf
            if expiterations > 1:
                if "+" in buf:
                    pb.update(buf.count("+"))
                else:
                    pb.refresh()
            match = self.end_pat.match(data)
        # Remove stop pattern and return
        if expiterations > 1:
            pb.close()
        return data[:match.end() - 2]
The diff you're trying to view is too large. Only the first 1000 changed files have been loaded.
Showing with 0 additions and 0 deletions (0 / 0 diffs computed)
swh spinner

Computing file changes ...

back to top

Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API