[System design] From service to architecture

The new series refers to the system design explanation of the nine chapters algorithm

Article Directory

Specification of system design interview

Feasible solutions are 25%, special problems in specific scenarios are 20%, analysis capabilities are 25%, trade-offs are 15%, and knowledge reserves are 15%.

According to my understanding of the school’s enrollment requirements, first examine our communication skills in such questions, whether we can understand the idea of ​​the interview, and can we clearly express our thoughts. The second is to look at the individual's ability to think about problems, including understanding the scene of the problem, abstracting a problem, and proposing a feasible solution. Try to grasp the core of the problem as quickly as possible. If you further upgrade, you should also show your comprehensiveness in thinking about the problem as much as possible. The feasible solution is not perfect. We can further analyze the advantages and disadvantages of this method. And give your own upgrade strategy. Here is the role of trade-offs and knowledge reserves.

4S analysis method

  • Scenario scenario: Analyze clearly what scenarios and services are needed. QPS, DAU, Interface
  • Service service: The core module of the large system is split.
  • Storage: The storage of data. Here design to the object-oriented design, how to design the table, what kind of structure is used to store data. Data, SQL, Nosql, File System
  • Scale upgrade: scalability, what are the problems with the current feasible solution, and how to upgrade. (Upgrade, maintenance-robustness, single point of failure; scalability, coping with surge in traffic)

Scenario analysis: Given some reference, the daily active users of Twitter are about 150M+. Choose some core services, post a tweet, TimeLine, News Feed, etc. Here you can make some reasonable estimates, including the number of concurrent users (daily active * average number of requests per user / number of seconds in a day), peak value (three times the average), read and write frequency (300k, 5k)

Insert picture description here

The core of the service is replay and merger, designing services for each requirement, and merging the same services.

Storage: Relational database: suitable for user information, etc.; non-relational: suitable for tweets, social graphs, etc.; file system: suitable for pictures and videos, etc. A reasonable design table is also needed here.

System design-Twitter design newsfeed

Similar needs still exist with Weibo, Moments, etc. The core of the new thing system is the relationship between attention and being followed, and what everyone sees is different.

storage storage-PUll model (drop down)

Idea: The idea of ​​K-way merging is adopted. When the user wants to view, the first 100 items that the user pays attention to are obtained, and then merged to obtain topK. Similar to K road merge.

Complexity analysis: It is very slow when the user obtains the information flow, and it needs to read the database N times; when it is sent quickly, it is directly written to the DB once

Disadvantages: It is slower every time you look at a new event stream. Need to read the sort from the database. User experience is slightly worse

storage storage-PUSH model (send)

  • Idea: Each user creates a list to store everyone's feed news. After the user sends a tweet, it is sent to each user's News feed. (Keywords, fanout, fanout). In this way, only the first 100 messages need to be read.
  • Complexity analysis: Reading messages is fast, one DB read. Sending tweets is troublesome, and N fans need to perform N DB writes. But this step can be done asynchronously without waiting.
  • Defect: Not timely, because there may be many followers. There is a delay, which may be a problem for celebrities’ social networks.

Model comparison

Facebook-pull; Twitter-pull; Ins-Push+pull It can be seen that the pull method is the mainstream. Because for applications, user experience is the core, so issues such as latency and zombie fans must be considered

Push is suitable for two-way attention, such as a circle of friends. The fans are not large, the real-time performance is not high, and the resources are very small. Pull is suitable for high real-time, one-way attention to celebrity issues, and Weibo.

upgrade

  • PULL method: add a cache before the DB, and cache the tweets of some users. The cache maintains the message queue of a user in the timeline . In this way, N times of BD becomes N times of cache request. And you can make a trade-off, and use LRU to eliminate outdated caches. Cache each user’s News Feed and directly maintain the user’s new feed.
  • PUSH method: The push model puts the NewsFeed model in memory. The cost of this is actually not high.

celebrity effect

For hot stars, there is a lot of pressure to fan out. It can be solved by adding a server in a short time. Don't change the model when you come up here, it will cost a lot.

  • Push+pull: Ordinary users still push, while celebrity users use the pull method. Star users maintain their own timeline, and fans need to make their own songs.

Attention and clearance

After following a user, merge his Timeline into your News Feed asynchronously. After Unfollow a user, asynchronously remove his Tweets from your News Feed. Asynchrony allows users to get feedback quickly, but the disadvantage is that it may delay the effect.

Store likes

Insert picture description here

Design user system-understand database and cache

Design the user system, including the realization of registration, login, user information query, etc. The difficulty lies in the storage of data , including the storage of friend relationships .

4S analysis
  • Scenario: registration, login, query, user information modification. Among them, query is the biggest demand, and the QPS to be supported is the highest.
  • Service: Registration and login module, module for querying user information, responsible for storage of friend relations

The impact of different QPS on storage systems

  • SQL databases such as MySQ: probably support up to 1K qps
  • Hard disk Nosql such as MongoDB can support up to 10K
  • 100k performance of Redis/Memcached in-memory database

Features of the user system

Read a lot and write very little. For a system that reads more and writes less, you must consider using Cache optimization . The system for human use is more writing and less writing, and the high probability of using it for machines is writing more and less reading.

For the use of cache: be sure to first get in the db, and then set from the cache. When deleting, delete it from the cache first, and then update it in the DB.

Ensure user login: use cookie or session. Users will bring their own cookies when sending visits to the server.

Insert picture description here

This Session Table can be stored in the cache or in the DB. If there are a lot of users, you can use Cache optimization. However, you need to be wary that putting all of them in the cache may cause a large-scale re-login after a power failure, and the database pressure will increase sharply.

Friendship Store

It can be non-repetitive storage, numbering the user, with the small number in the front and the large number in the back; or it can be repeated storage, where A’s friends have B, and B also has A.

If transactions are required, caches are generally not used.

Insert picture description here

Prevent single point of failure (slicing, backup)

Data splitting is to ensure that a certain library is down, and it will not cause the website to be 100% unavailable. You can use sub-database and sub-table. Vertical splitting is actually splitting according to the business, disassembling large tables into small tables, and try to use small tables for some hot data. Prevent lock problems. Horizontal segmentation is to consider the splitting of some huge tables. The key here is the selection of the sub-table key and the method of sub-table. Table sharding keys generally need to be related to business application scenarios. Table sharding methods include simple modulo, etc. Crude methods have problems such as hot data and not easy to expand. A better method is to use consistent hashing. This can minimize the trouble of re-separation after expansion.

Data backup is to consider master-slave backup, cluster. You can also write to the main library and read from the library to reduce pressure.

Consistent Hash Algorithm

Regarding the entire hash interval as a ring, the size of the ring is [0, 2 64 − 1] [0, 2^{64}-1] [ 0 ,26 4−1 ]. And introduced the concept of virtual nodes (Virtual nodes). Each physical machine corresponds to 1000 points on the ring. When we add a piece of data, we calculate the hash value of the key to correspond to a point on the ring. Find the first virtual node storage clockwise. When a new machine is added for data migration, the 1000 virtual nodes all ask for data from their first virtual node clockwise.

  • Why is it 1000 points?
Because if it is one point, it is easy to be uneven. For example, if we only have three servers, 3,000 points are definitely more even than 3 points. Why not more? There is a trade of here. We need to store the position of all virtual nodes on the ring. The storage form is using TreeMap, which is essentially the use of red-black trees. O (l o n g n) O(longn) O ( l o n g n )Quickly find the first virtual node that is greater than the current data hash value within a period of time. Therefore, too many nodes will increase the cost of storage.

Backup and copy

Backups are generally periodic, and replication is generally real-time. Let's take Mysql's master-slave model as an example. The principle is that any operation of Write Ahead Log, which is the main library of sql, will be recorded in the log, and then this log will be sent to the slave node through a dedicated io thread. Of course , the issue of master-slave delay needs to be considered here

System Design————Design a short URL system

The short URL system is to map a relatively long URL such as www.baidu.com to http://goo.gl/2KEEaJ

  • Scenario: Realize the mutual conversion from long URL to short URL. And you can enter the short url and then jump to the long url correctly.
  • Requirements: QPS+Storage can also estimate QPS based on DAU, and consider the peak situation to select a suitable storage system. The required hard disk space can be calculated based on the storage required by the URL.
  • Service: Only a simple URL conversion
  • Storage: Both sql and nosql are available.

Short URL algorithm

  1. Use the last 6 bits of the hash function directly (not feasible). Although it is very fast, there will be collisions that cannot be resolved.
  2. Randomly generate a short URL and use the database to remove duplicates. The implementation is very simple, but it will become slower and more serious as the short URL increases later.
  3. Base 62 conversion . The URLs that can be used include (0-9, az, AZ), a total of 62. Can be changed to 62 hexadecimal. Each URL corresponds to an integer. The integer is the primary key of the database table and is self-increasing. This 6-digit number can store 57 billion content. The advantage is high efficiency, but the disadvantage is that it relies on the self-incrementing primary key and can only use the sql database.

If you use a randomly generated method . If using a SQL database, we need to index the two URLs separately for quick search. Or create two tables in the nosql database to store the corresponding key-value pairs.

If the method of hexadecimal conversion is adopted . You must use an sql database with an auto-incrementing primary key. Only the primary key and the long URL are required, and the short URL can be converted from the primary key.

Optimization (how to speed up)

First, you can use the cache layer to speed up the reading efficiency , and store commonly used converted long and short URLs. Some elimination algorithms can be used here.

Secondly, you can use geographic location information to speed up , optimize server access speed, resolve users from different regions to different servers through DNS, and further, you can store the Chinese website in the Chinese database, and the US website in the US database.

We also need to solve the problem of how to expand. The bottleneck of this system design is that it is too busy, not that it can't be saved . Therefore, we can use the method of horizontal sub-table , but this is designed to the selection of sub-table keys . There are contradictory factors. If you select ID as the sub-table key, you cannot quickly query the long URL in that sub-database, and you can only use the broadcast method. A better way is to introduce an identification bit , such as AB123->0 AB123, the 0 is obtained according to (Hash(long_URL)%62), so that it can be found quickly. At the same time, for the case of sub-database, we also need to use consistent hashing .

For the global self-incrementing ID after sub-table, you can use a database to realize the operation of self-incrementing ID, or use ZooKeeper. In fact, this is not necessarily a single point, it can be divided into single number, double number, etc.

Custom URL

We need to create a new CustomURLTable to store the abbreviations we want to define ourselves, instead of adding a column to the original table, which will cause a lot of holes.

Is it necessary to ensure one-to-one correspondence between long and short URLs?

The long URL corresponding to the short URL must be corresponding, but the short URL corresponding to the long URL is completely unnecessary. We can use a simple kv pair and set the expiration time to 24h. When the long URL comes, check it in the cache first, and return it if there is one, otherwise, just give a new short URL.

301 or 302 jump

This is also an interesting topic. First of all, of course, examine a candidate's understanding of 301 and 302. Understanding of browser caching mechanism. Then it is to examine his business experience. 301 is a permanent redirection, and 302 is a temporary redirection. The short address will not change once it is generated, so using 301 is consistent with http semantics. At the same time, the pressure on the server will also be reduced to a certain extent.

But if 301 is used, we cannot count the number of clicks on the short address. And this number of clicks is a very interesting data source for big data analysis. There are many things that can be analyzed. So although choosing 302 will increase the server pressure, I think it is a better choice.

Of course, we generally do not allow short URLs to expire.

System Design-Geographical Information Service

Many apps need to use location information, and then collect some information closest to your current location and feed it back to the user. The typical type is Didi, Dianping and other local life apps.

  • Scenario: The system needs to know the driver's location in real time, that is, the driver's location needs to be reported; when the user requests a taxi, it needs to match the nearest nearby driver based on the user's current location. It can be analyzed from this that uber should be a write-intensive requirement, because we need to synchronize the driver's position every 4s. Assuming a 200k driver, the write request per second is 50k, and the peak value can reach 150k. For storage, if each location is saved, 200k 86400/4 100bytes = 0.5T per day. If only the current position is recorded, it is probably 200k*100bytes=20M;
  • Service: Two large modules need to be included, one is location record and the other is location matching.
  • Storage: Obviously, the storage of the location is high-write, and the query of the order is high-read. And the storage of geographic information is relatively simple, you can use kv key-value pairs, here you can use high-speed redis. For the query of order information because the comparison table will be more complicated, it is better to use the sql database here.
Insert picture description here

The OOD design of the order table and the driver table in Sql can be as shown in the figure below.

Insert picture description here

How to store and query geographic location information

Two commonly used algorithms, one is the google S2 algorithm, which uses the Hilbert curve to map the address space to 2 64 2^{64} 26 4Integers. Two points that are similar in space are generally similar in integers. The library functions of this method are relatively rich and more accurate. Another method is to use GeoHash . The algorithmic idea converts the geographic location into a string. The more two strings match, the closer the two points are generally. The algorithm is a bipartite map, and the specific code implementation is implemented below.

For geographic location storage, you can use sql database, because we often search, so we need to index geohash, and use LIKE to query. However, this is not very consistent with the military regulations of SQL. First, the indexed column should not be inserted frequently, which will cause the appearance of index holes and waste space. Secondly, LIKE query is relatively slow, generally not recommended.

Since we are writing frequent operations, we can use redis to create a simple key-value pair, where the key is the position and the value is the set of the driver number. For example, if the location of the Driver is 9q9hvt, it can be stored in the 3 keys of 9q9hvt, 9q9hv, and 9q9h, because the accuracy of the four positions is already 20km, so you won't be able to take a car over 20km. For the 6-digit agreement, it is already within one kilometer, and it is accurate enough. If the match is successful (the distance is close enough, the driver is free, the driver will take the order), the system returns the driver's number. Then go to the database or another redis table to query the specific latitude and longitude information of the driver. That is, we need to get the location of the nearby driver from the geographic location, and we also need to know the id of the driver to get the location

Insert picture description here
  • Expansion: To solve the single point of failure problem, the performance bottleneck that can be solved through multiple redis. Of course, it is also necessary to allocate traffic reasonably. A better idea is to divide it according to the city. In fact, it is a mathematical problem of judging whether a point is in a polygon. How to determine whether the user is in the airport area or not, you can first determine the city, and then find whether the user is in the promotion area in the headset content.

Implement GeoHash algorithm

Base32, 0-9, az remove (a, i, l, o). The Piano curve used here is actually used for coverage. When converting to a character string, it usually crosses the latitude and longitude information. The reason for choosing 32 bits is that this number is exactly 2 to the 5th power. It is also conducive to calculation under the condition of ensuring accuracy.

Since GeoHash divides the area into regular rectangles and encodes each rectangle, it will cause the following problems when querying nearby POI information. For example, the red point is our location, and the green two points are nearby. Two restaurants, but when you query, you will find that the GeoHash code of the farther restaurant is the same as ours (because they are in the same GeoHash area), while the GeoHash code of the closer restaurant is inconsistent with us. The cases where the distance is close but the codes are not the same often occur at the boundary.
[External link image transfer failed. The source site may have an anti-leech link mechanism. It is recommended to save the image and upload it directly (img-Ygcfmm9z-1622815449688)(https://images0.cnblogs.com/blog/522490/201309/09190137-edd3b1fe3d754c5d836e2812ac298674 .png)]

To solve this problem, we need to consider all the eight surrounding squares, traverse both the own square and the points in the surrounding eight squares, and then return to the points that meet the requirements. So how do you know the prefix of the surrounding squares? Carefully observe the adjacent squares, we will find that the two small squares will differ by 1 in the binary code of the longitude or latitude; after we parse the binary code through the GeoHash code, the longitude or latitude (or both) Add one to the binary code and combine it again into a GeoHash code.

We already know that the existing GeoHash algorithm uses the Peano space filling curve. This curve will produce abrupt changes, causing the problem that the codes are similar but the distances may vary greatly . Therefore, when querying nearby restaurants, first select the similar GeoHash codes. POI point, and then calculate the actual distance.

public class GeoHash {
    /*
     * @param latitude: one of a location coordinate pair 
     * @param longitude: one of a location coordinate pair 
     * @param precision: an integer between 1 to 12
     * @return: a base32 string
     */
    private String _base32 = "0123456789bcdefghjkmnpqrstuvwxyz"; // base32算法
    public String encode(double latitude, double longitude, int precision) {
        // write your code here
        // 实现GeoHash算法 涉及到的函数包括地理位置的转换为二进制,二进制转换为字符串
        // 然后根据字符串的匹配程度可以知道位置的误差信息
        //String _base32 = "0123456789bcdefghjkmnpqrstuvwxyz"; // base32算法
        String x = getBin(latitude, -90.0, 90.0);
        String y = getBin(longitude,-180.0, 180.0);
        StringBuffer sb = new StringBuffer();
        for (int i = 0;i<30 ;i++){
            sb.append(y.charAt(i));
            sb.append(x.charAt(i));
        }
        StringBuffer ans = new StringBuffer();
        String res = sb.toString();
        for (int i = 0;i<60;i+=5){
            ans.append(_base32.charAt(Integer.parseInt(sb.substring(i,i+5),2)));
        }
        return ans.toString().substring(0,precision);
    }

    private String getBin(double position, double left, double right){
        StringBuffer sb = new StringBuffer();
        // 这里为什么是30呢?因为我们最终的字符串的长度是12,因此二进制需要有12*5=60 
        // 因为是由经纬度拼接成的,所以各自都需要时30
        for(int i = 0;i<30;i++){
            double mid = (left+right)/2.0;
            if (position<=mid){
                sb.append("0");
                right = mid;
            }else{
                sb.append("1");
                left = mid;
            }
        }
        return sb.toString();
    }
    public double[] decode(String geohash) {
        // write your code here
        //String _base32 = "0123456789bcdefghjkmnpqrstuvwxyz";
        int[] mask = {16, 8, 4, 2, 1};
        double[] x = {-180, 180};
        double[] y = {-90, 90};
        boolean is_even = true;
        for (int i = 0; i < geohash.length(); i++){
            int index = _base32.indexOf(geohash.charAt(i));
            for(int j = 0; j<5;j++){ // j之所以是5,因为我们还是32个base
                if (is_even){
                    refine_interval(x, index, mask[j]);
                }else{
                    refine_interval(y, index, mask[j]);
                }
                is_even = !is_even;
            }
        }
        double[] location = {(y[0]+y[1])/2.0, (x[0]+x[1])/2.0};
        return location;
    }

    private void refine_interval(double[] pos, int val, int mask){
      // 判断当前位是向左还是向右
      if((val & mask) != 0){
          pos[0] = (pos[0] + pos[1])/2.0;
      }else{
          pos[1] = (pos[0] + pos[1])/2.0;
      }
    }
}

System design-real-time chat system

The basic problem is obvious, which is to design a WeChat type design.

  • Scenarios and main functions: send messages between users, group chats, user status settings. Facebook's data is 1 billion monthly active users, and the daily active data is about 75%, so there can be 75 million daily active users. Therefore, we need to design a million-level system, assuming a user sends 20 messages a day, QPS=100M*20/86400~20K. Storage, each record is about 30bytes, 2B messages a day, about 60G storage.
  • Service: message service information management, responsible for the basic transmission and storage of information; RealTime service, mainly responsible for information reading operations.
  • storage:

For a chat software, the storage of information is the key. The normal idea is to only keep in the table, from_user_id sender id, to_user_id receiver id, content content, created_time creation time. But this design has obvious shortcomings. For example, when we query the conversation between two people, we needSELECT * FROM message_table WHERE from_user_id=A and to_user_id=B OR to_user_id=B and from_user_id=A ORDER BY created_at DESC;

This query is obviously very inefficient, and if it is a multi-group chat, this method is difficult. For applications such as WeChat, any interaction is two-way between receiving and sending, so it is a better idea to define a dialog, introduce sessionId, and identify each dialog. We can design a Message table. This table is for the storage of messages for all users of the entire app. In addition, there is a Thread table for everyone. The two tables are connected by thread id. The advantage of this design is that we can easily query all the user's speech information for the message table. For the user to retrieve the information of a certain dialogue, the parameter Thread id must be specified, so we can easily query the information of a certain dialogue when we inquire. And for the user's dialog settings, it actually includes a lot of private settings, such as chat notes and mute settings. In other words, the information sent by all users is a common table, and the conversations of all users are another table.

Insert picture description here

For the message table sub-table key, thread id is best. As we mentioned earlier, the conversation id is usually specified every time a query is made. This can be used as a sub-table (the setting of the sub-table key must be considered from the business side. When most queries are made, which parameters we will carry, this parameter is suitable as the sub-table key). For thread table, owner_id is more suitable as a sub-table key, because this is generally provided to users, and users only need to query "what conversations do I have". The primary key of this table should be [owner_id, thread_id], it may be better to index this. At the same time, we also need to index the Owner ID+update time, because what we generally provide to users is sorted by update time.

For the message table, the amount of data is relatively large and simple, and there is no need to increase the id. Each message is a log. You can use persistent nosql, use thread id as the key, and the value can be {user_id, content, created_time} json. For the Thread table, because the index needs to be used and the search and filtering are often performed, it is better to use sql.

The flow of users sending and receiving messages

  • The user sends a message message service:

First, the client sends the message and the message receiver to the server, here is a conversation for 1vs1. If it is a multi-person chat, it will generally retrieve whether there is a corresponding session id in the machine now, if it exists, send the session id directly to the server, otherwise entrust the server to establish one (let the server query whether there is a session among multiple people ID is very troublesome. It is necessary to also index the participants in the thread table, which is more troublesome). Then the server creates a message, which contains {user_id, content, created_time}. There are some clever optimizations here. ThreadId can have some tricks when it is created. For example, for a conversation between two people, we can define the ID to be a combination of the IDs of two people, so that it is easier to find. The group chat ID can be creator + creation time, etc.

  • User acceptance information RealTime service:

The simpler method is to ask the server to check if there is any information in 5s. This is the simplest, but there is a significant delay. The concept of socket can be introduced here, and the server provides push service, which can maintain a long connection with the client. When the user opens the app, he connects to his own socket in the push service. When someone sends a message, the message service receives the message and sends it out through the push service. If a user does not open the app for a long time, he can cut off the connection and release the port. Both Android GCM and ios APNS can be maintained. The core difference between socket and http is that HTTP can only request data from the server from the client, but the server can actively push data to the client under the socket . From the perspective of architecture, it is possible to separate the two modules of information sending and information pushing.

Insert picture description here
  • Group chat between users

The number of group chats is very large, and each message has to be sent to many people. But many users are actually offline, only the push service module knows whether the user is online. Therefore, we add a layer in the middle, channel service. For larger group chats, online users need to subscribe to the corresponding group chat channel first. When a user goes online, find the group chat to which the user belongs and mark it. The channel knows which channels the user is still alive. When the user goes offline, the push service notifies the channel. Therefore, when sending a message, the message is sent to the channel, and the channel pushes online users. This part uses memory storage, and restarts when it hangs up.

Insert picture description here
  • Inquiry of user online status The
    server needs to know which users are online at each moment; the user also needs to know which friends are online. It can also be divided into two ideas, push and pull.

Push is when the user tells the server to go online and offline. But the disadvantage of this is that after a sudden network error, the user cannot tell the server. The server tells all users at regular intervals which of their friends are online.

The pull method is better. The heartbeat strategy is still adopted. After the user goes online, a heartbeat is sent to the service every few seconds to tell the server to survive. Every time an online friend requests the server to check the online status of his friend. Therefore, this method is also easy to know how long the user has been offline.

System design-how to limit current

The current limit often appears in the program, limiting the number of times to refresh the webpage in a short period of time, etc. For general business, current limit can be achieved by simply maintaining the database. But for the spike business, the database method is very efficient, and we need to design a better strategy. First of all, from the perspective of engineering implementation, Google has the open source RateLimiter tool, which uses the token bucket algorithm, which can effectively limit the input traffic. Of course, this module can actually be considered as a small system design problem, and we can also use the 4S method for analysis.

  • Scenario: Limit the user's behavior, such as limiting the behavior within the last 30s, and limit the behavior if the number exceeds this amount.
  • Service: It is already the smallest module in itself and cannot be further refined.
  • Storage: This part of the data can be used to analyze website traffic, etc. And it does not require very strong consistency and accuracy, but the rate of reading and writing is very high. Therefore, the efficient access structure of redis can be used.

Design ideas:

  1. The first is a relatively simple design method, we use the redis key expiration strategy. For example, if we require that an operation cannot exceed 10 operations in one minute, then I will use the minute of the current system time as the key and set the expiration time to 2 minutes to ensure that the number of times within this minute is fully calculated. This method is relatively simple. What granularity we need is limited by what granularity is used as the key. But it is not completely accurate. For example, we can think that in the most extreme case, there may be twice the traffic in the net time. However, the requirements for current limiting are generally not required to be accurate.
  2. Another strategy is to use the most fine-grained time as the key, such as the second level, the expiration time is the minute level, and then when the minute traffic is counted, the cumulative sum can be looped. In this way, the traffic in the last minute can be obtained very accurately. The idea is similar to constantly writing a looping array. Generally speaking, we want to count the minute-level traffic, then our key is the second-level, and the expiration time is the minute-level. If it is day level, the key is the hour level, and the expiration time is the day level. This can ensure that the number of loops will not be too many when querying.
Of course, this will bring some errors. If you still need to get it accurately, we can use a multi-level query strategy. For example, we want to get the number of visits in the most recent day. The current time is 23:30:33, and the sum includes the second level 23:30:00 ~ 23:30: 33 a total of 34 queries, in the sub-query 23: 00 ~ 23: 29 a total of 30 queries, the current query 00 ~ 22, a total of 23 times. In addition, yesterday’s 26 seconds and 29 minutes, a total of 42 queries.

Google's Ratelimiter source code

The token bucket design idea is adopted, which can limit the number of submitted threads and limit the amount of data to be executed. The most distinctive feature is the delayed waiting strategy, which means that the number of requested licenses does not affect the control of the request itself. For example, if 100 tokens are currently requested, the system will also release them, but subsequent requests will compensate for the waiting.

Explain RateLimiter in detail

System design-datadog, website visit data statistics

Need to count website traffic, which can be used as system analysis data. For visits to a certain webpage, each visit is incremented by one, and we need to know the last counted times.

  • Storage: This is a counting problem, which means that the system is almost all writes and almost no reads. And we need to be persistent. And the function should be very simple, we can use the "name + timestamp" of the operation as the key, and +1 for each access; but it should be noted that the granularity of the storage is. For the most recent week’s data, we may need the minute level, this month’s need ten minutes level, this year’s need hour level, last year’s data may be based on the day. Therefore, we can learn from the idea of ​​multi-bucket, and organize and summarize each time. Reorganize regularly.

Taking into account the detection every time, if you need to monitor a lot of business, one update per second is very large qps, so you can entrust the server to cache the data every 15s, and then focus on the datadog feedback. At the same time, it is necessary to periodically Retention the data to lose weight.

System design-web crawler system

The crawler system can help companies collect information, and will use knowledge such as multithreading and system design. The crawler can obtain the corresponding text information by crawling the source code of the webpage of the website url address. For crawler systems, all web pages need to be crawled regularly for updates, and more storage space is required.

  • Scenario: One trillion webpages, 160w webpages per second. It needs to be updated once a week. The storage space is 10K per web page, and it takes about 10 petabytes to save it.
  • service: Including crawling system, taskservice, storageservice
  • Storage: DB storage can be used for crawling tasks, but the crawled data may only be stored using BigTable.
Insert picture description here

Implementation

We directly consider the multi-threaded design. First, we give multiple starting portals as the starting point, and then when the crawler goes to this url to crawl, it reads the source code of the web page, and then can use regular expressions to complete the response Crawling of specific content. For example, we want to crawl a certain

Tags surrounded by <h3[^>]*><a[^>]*>(.*?)<\/a><\/h3>memory . In order to use multithreading, the search method of bfs is more convenient. But here we can’t use queue to allocate tasks, because the queue is stored in memory, but because there are too many URLs, it cannot be loaded into memory; and the URL queue is not very good to control the priority of grabbing, etc. Some news pages may be crawled more frequently. Therefore, we can use db to store a task table so that we can crawl more customized.

Insert picture description here

Upgrade strategy

  • How to control the crawl frequency? It can be compared each time it is crawled. If the content of the web page changes, the crawling frequency will be doubled. If nothing changes, you can reduce the frequency. Thereby dynamically adjusting the update frequency.
  • How to solve the problem of crawling dead links? The content of the portal may be internally redirected, and we need to have a limit, for example, the current limit of qq.com requires 10%.
  • Reptiles are distributed in many regions. For American websites, crawl from the United States, and for Chinese websites from China.

System design-spelling association

When we type, type, or search engines, we often have the association function. In fact, this is a topK query with a specific prefix.

  • Scenario: An application with a DAU of 500m, the search volume per day can be considered to be used 6 times, each time about 4 characters. So the qps is about 4 * 6 * 500m / 86400 = 138K. The peak value is expected to be about 2 to 3 times.
  • Services: query services, data collection services. The query service is based on the current prefix of the user to obtain topK. Data collection needs to consider the most recent query hot words, and then sort and store them.
  • Storage: For query services, we must be in memory, and the cache can be used to speed up the more efficient. Then the general data can be in the form of a dictionary tree. The storage of complete data must be stored in the hard disk through log data.
Insert picture description here

Using the dictionary tree, it is easy to find what the current high-frequency words are. This is a way of thinking about space-for-time. Of course, this structure is stored in the content, and it can be considered that this part should be allocated to the query service. Finally, it is put into the hard disk through serialization.

Insert picture description here

The user's search records are recorded in the form of log, which can be sorted regularly using dataCollectionService. For example, it is updated every hour, and then the data is reintegrated into a new trie tree, and then hot-switched.

  • expand
  1. Performance evaluation: After entering the prefix, the time to give the recommended word can be used as an indicator; the hit rate is also an indicator.
  2. How to improve the response time? First, you can use the browser for caching, and secondly, there is a pre-fetch idea. For example, when I input ab, I search for the top10 of ab and the top10 of these words, which is 100 data. So progressive, so that we greatly increase the speed of response.
  3. How to deal with the situation that tire is too large. We can use the consistent hashing method. For example, when we have multiple queryService machines, when inputting a, the consistent hash points to machine 1 for searching and returning the answer. Then when you enter ab, go to machine 2 to search again.
Insert picture description here
  1. The log file takes up too much space? You can use the probability method to compress the log. Whenever someone queries amazon, I will do a randomization from 1-1000. Therefore, we can understand that we have used this probability method for recording. The premise of this is that you only want to query a large amount of data.

Distributed File System (GFS)

Google Three Musketeers, GFS (google file system) file system, Map Reduce quickly process data, how Bigtable connects the underlying storage and the upper data.

The commonly used distributed file system is HDFS, which is an open source distributed file system. But it mainly comes from the evolution of GFS.

  • Scenario: The user needs to write and read files. The files that support writing can be very large. At the same time, we need to be able to use multiple machines to store these files, so we need to consider the collaboration between machines.
  • Service: It is a client+server model, and our client needs to split files and so on. The server needs to complete the storage. At the same time, how to store is involved here.

Commonly used models include the peer2peer model , which has no central node, which makes it difficult to cause consistent synchronization. The advantage is that there is no bottleneck, and the node can continue to work if it is hung up. The difficulty is synchronization consistency.

The Master-slave model has a central node, and the others are slaves. The master node is only responsible for control and does not perform any storage operations; the storage operations are all handled by the child nodes, which is very similar to the sentinel mode. The advantage is that it is easy to maintain data consistency, but the disadvantage is that the master node has become a performance bottleneck. If the master hangs, restart it immediately.

  • Storage: Large files are stored in the file system. Data is generally stored in relatively small information files. For very large files, slicing may be required, and it is best to put them in a dedicated file system.

A large file may include basic file information such as metadata-file name, creation and modification time, size, and the specific content of the file. It is best to store the two separately, the metadata file can be loaded directly into the memory, so this part of the file is relatively small and often accessed. The theme of the file is stored in the hard disk. For windows, continuous storage is used, and Linux uses a better separate storage.

The advantage of separate storage is that it can solve the problem of space fragmentation. Large files are divided into small chunks, each of which is about 64M in size. The advantage is that the size of the metadata file can be reduced, because the relative position of all chunks needs to be stored in the metadata, and the disadvantage will be a little fragmented. After using the master-slave model, the real data is stored in the salve, and the master only stores the metadata, which will indicate where each chunk is stored. And the offset does not need to be stored in the metadata, it can be stored in the corresponding chunk.

The metadata of a 64M file is probably only 64B, so for a very large file, the metadata can actually be read into the memory.

Insert picture description here

Write : It is written by splitting, and the splitting process is completed on the client. This ensures that the cost of error retransmission during transmission is also relatively small. In the writing process, the client first interacts with the master, and then the master can tell the client that it should be stored on the slave according to the idle condition of the disk and the busy condition of the node. The client is in contact with the slave.

Insert picture description here

Modification : GFS itself does not support the modification of the file. If the modification is really done, it is in fact to write a new copy in a new space.

Reading : The client interacts with the master, the master tells the client where to read the fragment from, and then the client reads in the slave.

That is, the task of the master includes: storing the metadata of all files, storing a map (file name + chunk index -> chunk server), and assigning it during reading and writing.

  • expand:

Q: Is single master a bottleneck: Single master is a strategy used by 90% of the industry because of its simpler design. If necessary, you can also upgrade to multiple masters, paxos algorithm to coordinate multiple masters

Q: How to perform verification? Use checksums, simply include MD5, etc. Add a checksum to the end of each chunk. The checksum is very small, 4bytes=32bit, for a 1P file, it is 1P/64MB*32bit=62.5MB. Check the checksum when reading this file. If it is inconsistent, you can correct it from other backup points.

Q: How to detect whether a slave node is working normally? The slave node periodically performs a heartbeat detection mechanism.

Q: How to back up the writing process first? A better method is that the client only writes to one slave node during each transmission, and then this slave node becomes a team leader node, responsible for synchronizing other nodes. This idea is very similar to the design idea of ​​sentry + cluster. The selection of the team leader node can be the nearest geographically or idle. The team leader here may not be fixed, and there can be a different team leader according to each client request.

Real question

Q: Design a read-only lookup service. The data in the background is 10 billion key-value pairs, and the service format is to accept the key input by the user and return the corresponding value. It is known that the size of each key is 0.1kB, and the size of each value is 1kB. Requires system qps >= 5000, latency <200ms.

Server parameters: commodity server, 8X CPU cores on each server, 32G memory, 6T disk . Use any number of servers to design this service.

A:

total key size ~ 10 billion * 0.1kB = 1T;
total value size ~ 10 billion * 1kB = 10T.
So each server uses two hard drives, a total of 12T. The data structure uses SSTable (Sorted String Table).

To make full use of memory, I originally wanted to use a binary search tree as an index, but I think about it carefully that this service is read-only, and the hard disk stores the key-value pairs using SSTable, which is ordered, and the key and value lengths are fixed, so Just store the keys in the memory in an orderly manner. When querying, perform a binary search on the keys, and then use the offset of the key in the memory to calculate the offset of the key-value pair in the hard disk. 1T/32G = 31.25. So a total of 32 servers are required to share the key index. Add a master in front to manage consistent hasing. lg(32G) = 35, the average query for a key is 18 memory accesses, which is only about 1800ns, which can be ignored on the order of ms.

For each request, the time to read 1kB value on the hard disk: 10ms(disk seek) + 4ms(rotation delay for 7200rpm) + 1kB/1MB * 30ms(reading 1kB sequentially from disk) = 14ms. Currently a server can handle QPS: 1000ms/14ms = 71, total QPS: 71 * 32 = 2272. There is still more than twice the gap from the requirements. So we can install 6 6T hard disks for each server to form 3 sets of data hard disks. The 3 sets of hard disks can process 3 requests in parallel, which can be regarded as slightly using the 8X multi-core CPU. At this time, QPS is 2272 * 3=6816.

delay:

  • Master memory search time for consistent hashing map: Ignore
  • The round trip delay of master and slave: 1 round trip in the same data-center is 1ms.
  • Slave memory index query time: Ignore
  • Slave hard disk read time: 14ms

so total latency is 15ms.

Map Reduce (counting in big data scenarios)

Map reduce, GFS, the three carriages of Google's big data.

Background of the problem: Count the frequency of words in all websites. If a single machine performs statistics, there is a performance bottleneck, so the big data processing method is selected. Multiple machines are responsible for the two processes of map and reduce. The map is responsible for counting the number of words in an article, and the reduce is responsible for merging the number of specific words.

The whole process can be divided into six processes. Input sets the input file, split system completes the average distribution of files to the machine, map implements article segmentation into words , transfers and organizes, reduce implements word unity , and output sets output files. The big data framework has helped us complete the basic construction, we only need to implement map and reduce. Pay attention to the input and output of the two functions. Map input: key article storage address, value article content. Reduce input: key, key output by map, value: value output by map.

/**
 * Definition of OutputCollector:
 * class OutputCollector<K, V> {
 *     public void collect(K key, V value);
 *         // Adds a key/value pair to the output buffer
 * }
 */
public class WordCount {
    public static class Map {
        public void map(String key, String value, OutputCollector<String, Integer> output) {
            // key 对应这个文章的地址
            // value 对应文章的内容
            String[] tokens = value.split(" ");
            for(String word:tokens){
                output.collect(word,1);
            }
            // Output the results into output buffer.
            // Ps. output.collect(String key, int value);
        }
    }

    public static class Reduce {
        public void reduce(String key, Iterator<Integer> values,
                           OutputCollector<String, Integer> output) {
            int sum = 0;
            while(values.hasNext()){
                sum += values.next();
            }
            output.collect(key,sum);
            // Output the results into output buffer.
            // Ps. output.collect(String key, int value);
        }
    }
}

The number of machines for map and reduce is up to you. But it is not necessarily the more machines, the better, the increase of machines can increase the parallel speed of data processing, but the cost of starting the machine needs to be considered, and there is an upper limit for reduce (the type of words), blindly increasing the machine will lead to the master The pressure has also increased.

When the word frequency is counted, there is a stopwords list. The content of this list needs to be filtered for some high-frequency meaningless words, including I, you, the, etc. When these words are counted, skip directly.

Insert picture description here

The sorting here can only use outer sorting, because the amount of data is very large, so it cannot be loaded into memory and sorted directly. The basic idea of ​​external sorting is to first split the large files into the memory and read them normally, then sort the small files in the memory, and then use the K-way merge strategy to merge.

Typical topic
  1. Inverted list: Count in which articles a certain word appears. Here we need to count which words (map) appear in each article, and then distribute (reduce) statistics according to the words and finally output an inverted list.
  2. Similar words: Split a large number of words, and then count which words appear in a certain sort.

System Design of Map Reduce

Both input files and output files need to be placed on GFS. The intermediate process volume only needs to be placed on the local hard disk without special processing.

Insert picture description here
  • Q: What is the working order of mapper and reducer? The mapper needs to complete the work before the reducer can start working.
  • Q: What should I do if the mapper or reducer hangs during operation? When we start again, there is actually a machine pool, and then the master controls and controls the machines to perform different tasks. If it hangs, it will reassign a machine for execution.
  • Q: What problems can be caused by the fact that the statistical value of a key is very large? How to solve it? It will lead to imbalance, and the running time of a reducer is very long. You can add a random number after the key, which is equivalent to dividing the key into several blocks.

Distributed database Bigtable

Solve Nosql database problems in big data scenarios. The file system generally gives an address and then returns a file. The database system obtains certain data from files (tables) filled in according to certain specifications. The core is to query data. Under normal circumstances, the database system is established on the file system, responsible for organizing and storing some data in the file system, and the external interface is more convenient to manipulate the data.

  • Scenario (requirement): Query a key and be able to return the corresponding value. The scene is relatively simple. The data is not stored in the form of a table, but may be written in json format, etc., and then serialized and stored in a file.

How to search from the files on the hard disk? Due to the huge amount of data, it is impossible to read all of it into the memory for searching. A feasible idea is to save the data to the hard disk in an orderly manner, and then divide the files on each hard disk into two. Here, the idea of ​​partial sorting is designed. The idea of ​​hard disk dichotomy is also to partially read in and then search.

How to solve the data modification problem? It is not possible to modify directly from the file, because the file size after modification cannot be guaranteed; it is also inappropriate to read the entire file for copying, and the efficiency is too low. The feasible approach is to directly append at the end of the file. But this will cause some new problems. First of all, because this part of the data is modified later, it must be disordered, and there will be multiple data, it is not easy to find the correct data. Only after a period of time, the files can be organized in an orderly manner. For example, each time we put the added information in the memory, after reaching a certain size, it will be sorted in the memory once, and then written to the hard disk for persistence. The advantage of this is that each file is a small block, ensuring that the data is in order within each block, and only the last added information is out of order in the memory. Therefore, when we are looking for data, for loops each hard disk and then divides the hard disk into two. For the modified data, we can add a timestamp to ensure that the data obtained is the latest data. Since this may result in the storage of some redundant information, we can perform K-way merging and sorting data every once in a while.

Read and write process of the complete system

During the data writing process, the starting cheap address of each file block is stored in the memory. The writing process is first performed in the memory, where the data structure of the jump table is used , and then serialized to the hard disk. Ensure that the hard drives are in order. In order to ensure that in case of memory power failure, data loss, we need to additionally adopt WAL strategy. Of course, wal needs to write to the hard disk, but this is very fast. Therefore, the writing process requires memory sorting + one hard disk uniform write + one hard disk log write.

Insert picture description here

In the process of data reading, first go to the memory to read, if it is read, it will directly return to the latest; otherwise, it will go to each hard disk file for dichotomy. In order to speed up the indexing we can build, each file will be indexed by itself. Because each file is internally ordered, but the files may not be in order, the index is stored in the memory (if it is too large, it can be placed on the hard disk and read in batches). The index can be used quickly Positioning should go to a certain position of a file block to read. Sstable = Sorted String Table

Insert picture description here

How to quickly check whether a certain data is in the file? If a file is not in the file, there is no need to read it. It is possible to use a hashmap, but for a large amount of data, it will take up more space. It is better to use the bloom filter. The bloom filter is Multiple hash functions are composed of bit arrays. For a key, each hash function is hashed to get the corresponding value, and the corresponding position is set in the bit array. In this way, when we judge, we can judge whether the corresponding positions are all 1s. Therefore, you can see the characteristics of the Bloom filter, he said that there must be no, and that some may be misjudged. The probability of misjudgment is related to the number of hash functions, the length of the bit array, and the number of strings added.

Clustered Bigtable+GFS

How to implement clustering? It is also necessary to have a master to perform control operations such as consistent hashing. Where the master directs the client to read and write data. If the amount of data continues to increase and the stand-alone hard disk cannot be written or opened, GFS can be used in combination. Split bigtable's sstable into small chunks and write them into GFS.

Insert picture description here

We generally turn Bigtable's salve into a Tablet server, which is actually the slave server of the stored tablet. For bigtable, we need to consider competition issues because of modification operations, so we need to introduce distributed locks. Such as chubby used by Google or zookeeper used by hadoop. When the user reads, the request will be sent to lock. After the lock performs consistent hashing, the corresponding node will be locked, and then the node information will be returned to the client for reading and writing. After completion, the lock resource will be released. In other words, lock has taken over part of the original master's services. The main task of the master now is to detect the health of the slave and perform sharding at startup. The rest of the metadata can be handed over to the lock operation.

Therefore, the entire system requires Client + Master + Tablet Server + Distributed Lock. Among them, distributed locks need to update metadata (index information, etc.), and lock and unlock the key.

In the whole design idea, in order to speed up writing, we adopt additive writing and use sstable; in order to speed up reading: hard disk is divided into two, metadata maintains index and bloom filter.